feat(queue): add graceful disconnect

This commit is contained in:
Ilya Averyanov 2024-06-27 21:17:53 +03:00
parent 1d728a05b2
commit d32f282feb
11 changed files with 229 additions and 28 deletions

View File

@ -757,7 +757,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
%%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) ->
S1 = maybe_set_offline_info(S0, Id),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
@ -767,8 +767,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
_ ->
S2
end,
S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}.
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
S = emqx_persistent_session_ds_state:commit(S4),
{shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
-spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, Session = #{id := Id, s := S}) ->

View File

@ -15,6 +15,7 @@
on_subscribe/3,
on_unsubscribe/4,
on_disconnect/2,
on_streams_replayed/2,
on_info/3,
@ -118,29 +119,20 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
%% TODO
%% Is it sufficient for a report?
Progress = fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt,
use_finished => is_use_finished(S, SRS)
},
[StreamProgress | Acc]
end,
[],
S
),
Progresses = stream_progresses(S),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progress
Agent0, Progresses
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0),
Progresses = stream_progresses(S1),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
@ -157,6 +149,30 @@ to_map(_S, _SharedSubS) ->
%% Internal functions
%%--------------------------------------------------------------------
stream_progresses(S) ->
fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
case is_stream_fully_acked(S, SRS) of
true ->
%% TODO
%% Is it sufficient for a report?
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt,
use_finished => is_use_finished(S, SRS)
},
[StreamProgress | Acc];
false ->
Acc
end
end,
[],
S
).
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
@ -322,6 +338,15 @@ revoke_stream(
end
end.
revoke_all_streams(S0) ->
fold_shared_stream_states(
fun(TopicFilter, Stream, _SRS, S) ->
revoke_stream(#{topic_filter => TopicFilter, stream => Stream}, S)
end,
S0,
S0
).
-spec to_agent_subscription(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
) ->
@ -339,5 +364,8 @@ agent_opts(#{session_id := SessionId}) ->
now_ms() ->
erlang:system_time(millisecond).
is_use_finished(S, #srs{unsubscribed = Unsubscribed} = SRS) ->
Unsubscribed andalso emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S).
is_use_finished(S, #srs{unsubscribed = Unsubscribed}) ->
Unsubscribed.
is_stream_fully_acked(S, SRS) ->
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S).

View File

@ -44,7 +44,8 @@
-type stream_progress() :: #{
topic_filter := topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([
@ -63,6 +64,7 @@
on_unsubscribe/2,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -81,6 +83,7 @@
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, term()}.
-callback on_unsubscribe(t(), topic_filter()) -> t().
-callback on_disconnect(t(), [stream_progress()]) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t().
-callback on_info(t(), term()) -> t().
@ -106,6 +109,10 @@ on_subscribe(Agent, TopicFilter, SubOpts) ->
on_unsubscribe(Agent, TopicFilter) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
-spec on_disconnect(t(), [stream_progress()]) -> t().
on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent).

View File

@ -14,6 +14,7 @@
on_unsubscribe/2,
on_stream_progress/2,
on_info/2,
on_disconnect/1,
renew_streams/1
]).
@ -36,6 +37,9 @@ on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
on_unsubscribe(Agent, _TopicFilter) ->
Agent.
on_disconnect(Agent) ->
Agent.
renew_streams(Agent) ->
{[], Agent}.

View File

@ -17,6 +17,7 @@
on_unsubscribe/2,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -68,6 +69,19 @@ on_stream_progress(State, StreamProgresses) ->
maps:to_list(ProgressesByGroup)
).
on_disconnect(#{groups := Groups0} = State, StreamProgresses) ->
ProgressesByGroup = stream_progresses_by_group(StreamProgresses),
Groups1 = maps:fold(
fun(Group, GroupSM0, GroupsAcc) ->
GroupProgresses = maps:get(Group, ProgressesByGroup, []),
GroupSM1 = emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses),
GroupsAcc#{Group => GroupSM1}
end,
#{},
Groups0
),
State#{groups => Groups1}.
on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) ->
?SLOG(info, #{
msg => leader_lease_streams,

View File

@ -27,7 +27,8 @@
%% API
fetch_stream_events/1,
handle_stream_progress/2
handle_stream_progress/2,
handle_disconnect/2
]).
-export_type([
@ -72,8 +73,9 @@
-define(connecting, connecting).
-define(replaying, replaying).
-define(updating, updating).
-define(disconnected, disconnected).
-type state() :: ?connecting | ?replaying | ?updating.
-type state() :: ?connecting | ?replaying | ?updating | ?disconnected.
-type connecting_data() :: #{}.
-type replaying_data() :: #{
@ -169,6 +171,18 @@ fetch_stream_events(
),
{GSM#{stream_lease_events => []}, Events1}.
-spec handle_disconnect(group_sm(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> group_sm().
handle_disconnect(#{state := ?connecting} = GSM, _StreamProgresses) ->
transition(GSM, ?disconnected, #{});
handle_disconnect(
#{agent := Agent, state_data := #{leader := Leader, version := Version} = StateData} = GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_disconnect(
Leader, Agent, StreamProgresses, Version
),
transition(GSM, ?disconnected, StateData).
%%-----------------------------------------------------------------------
%% Event Handlers
%%-----------------------------------------------------------------------
@ -229,6 +243,12 @@ handle_updating(GSM0) ->
),
GSM2.
%%-----------------------------------------------------------------------
%% Disconnected state
handle_disconnected(GSM) ->
GSM.
%%-----------------------------------------------------------------------
%% Common handlers
@ -301,6 +321,10 @@ handle_leader_update_streams(
_StreamProgresses
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_update_streams(
#{state := ?disconnected} = GSM, _VersionOld, _VersionNew, _StreamProgresses
) ->
GSM;
handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) ->
?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{
gsm => GSM,
@ -335,6 +359,10 @@ handle_leader_renew_stream_lease(
VersionNew
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(
#{state := ?disconnected} = GSM, _VersionOld, _VersionNew
) ->
GSM;
handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) ->
?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{
gsm => GSM,
@ -344,6 +372,8 @@ handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) ->
%% Unexpected versions or state
transition(GSM, ?connecting, #{}).
-spec handle_stream_progress(group_sm(), emqx_ds_shared_sub_proto:agent_stream_progress()) ->
group_sm().
handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) ->
GSM;
handle_stream_progress(
@ -376,7 +406,9 @@ handle_stream_progress(
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, PrevVersion, Version
),
ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL).
ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL);
handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) ->
GSM.
handle_leader_invalidate(GSM) ->
transition(GSM, ?connecting, #{}).
@ -485,7 +517,9 @@ run_enter_callback(#{state := ?connecting} = GSM) ->
run_enter_callback(#{state := ?replaying} = GSM) ->
handle_replaying(GSM);
run_enter_callback(#{state := ?updating} = GSM) ->
handle_updating(GSM).
handle_updating(GSM);
run_enter_callback(#{state := ?disconnected} = GSM) ->
handle_disconnected(GSM).
progresses_to_lease_events(StreamProgresses) ->
lists:map(

View File

@ -213,6 +213,19 @@ handle_event(
update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew)
end),
{keep_state, Data1};
handle_event(
info,
?agent_disconnect_match(Agent, StreamProgresses, Version),
?leader_active,
Data0
) ->
% ?tp(warning, shared_sub_leader_disconnect, #{
% agent => Agent, version => Version
% }),
Data1 = with_agent(Data0, Agent, fun() ->
disconnect_agent(Data0, Agent, StreamProgresses, Version)
end),
{keep_state, Data1};
%%--------------------------------------------------------------------
%% fallback
handle_event(enter, _OldState, _State, _Data) ->
@ -399,6 +412,28 @@ assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) ->
),
set_agent_state(Data1, Agent, AgentState).
%%--------------------------------------------------------------------
%% Disconnect agent gracefully
disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
case get_agent_state(Data0, Agent) of
#{version := Version} ->
?tp(warning, shared_sub_leader_disconnect_agent, #{
agent => Agent,
version => Version
}),
Data1 = update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version),
Data2 = drop_agent(Data1, Agent),
Data2;
_ ->
?tp(warning, shared_sub_leader_unexpected_disconnect, #{
agent => Agent,
version => Version
}),
Data1 = drop_agent(Data0, Agent),
Data1
end.
%%--------------------------------------------------------------------
%% Drop agents that stopped reporting progress
@ -790,6 +825,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) ->
#{streams := Streams, revoked_streams := RevokedStreams} = AgentState,
AllStreams = Streams ++ RevokedStreams,
Data1 = unassign_streams(Data0, AllStreams),
?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}),
Data1#{agents => maps:remove(Agent, Agents)}.
invalidate_agent(#{group := Group}, Agent) ->

View File

@ -16,6 +16,7 @@
agent_connect_leader/4,
agent_update_stream_states/4,
agent_update_stream_states/5,
agent_disconnect/4,
leader_lease_streams/5,
leader_renew_stream_lease/3,
@ -124,6 +125,23 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
).
agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{
type => agent_disconnect,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_streams(StreamProgresses),
version => Version
}),
_ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)),
ok;
agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto_v1:agent_disconnect(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
).
%% leader -> agent messages
-spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok.

View File

@ -16,6 +16,7 @@
-define(agent_update_stream_states_msg, agent_update_stream_states).
-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout).
-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout).
-define(agent_disconnect_msg, agent_disconnect).
%% Agent messages sent to the leader.
%% Leader talks to many agents, `agent` field is used to identify the sender.
@ -64,6 +65,20 @@
agent := Agent
}).
-define(agent_disconnect(Agent, StreamStates, Version), #{
type => ?agent_disconnect_msg,
stream_states => StreamStates,
version => Version,
agent => Agent
}).
-define(agent_disconnect_match(Agent, StreamStates, Version), #{
type := ?agent_disconnect_msg,
stream_states := StreamStates,
version := Version,
agent := Agent
}).
%% leader messages, sent from the leader to the agent
%% Agent may have several shared subscriptions, so may talk to several leaders
%% `group` field is used to identify the leader.

View File

@ -62,6 +62,18 @@ agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, VersionO
ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
]).
-spec agent_disconnect(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
list(emqx_ds_shared_sub_proto:agent_stream_progress()),
emqx_ds_shared_sub_proto:version()
) -> ok.
agent_disconnect(Node, ToLeader, FromAgent, StreamProgresses, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_disconnect, [
ToLeader, FromAgent, StreamProgresses, Version
]).
%% leader -> agent messages
-spec leader_lease_streams(

View File

@ -152,6 +152,38 @@ t_stream_revoke(_Config) ->
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_graceful_disconnect(_Config) ->
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr4/topic7/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr4/topic7/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 2_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 2_000),
?assertWaitEvent(
ok = emqtt:disconnect(ConnShared1),
#{?snk_kind := shared_sub_leader_disconnect_agent},
1_000
),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/2">>, <<"hello4">>, 1),
%% Since the disconnect is graceful, the streams should rebalance quickly,
%% before the timeout.
?assertReceive({publish, #{payload := <<"hello3">>}}, 2_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 2_000),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),