From d32f282feb5c2822e3cbfb7673422ccec720484f Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 27 Jun 2024 21:17:53 +0300 Subject: [PATCH] feat(queue): add graceful disconnect --- apps/emqx/src/emqx_persistent_session_ds.erl | 7 +- ...emqx_persistent_session_ds_shared_subs.erl | 68 +++++++++++++------ ...ersistent_session_ds_shared_subs_agent.erl | 9 ++- ...tent_session_ds_shared_subs_null_agent.erl | 4 ++ .../src/emqx_ds_shared_sub_agent.erl | 14 ++++ .../src/emqx_ds_shared_sub_group_sm.erl | 42 ++++++++++-- .../src/emqx_ds_shared_sub_leader.erl | 36 ++++++++++ .../src/emqx_ds_shared_sub_proto.erl | 18 +++++ .../src/emqx_ds_shared_sub_proto.hrl | 15 ++++ .../src/proto/emqx_ds_shared_sub_proto_v1.erl | 12 ++++ .../test/emqx_ds_shared_sub_SUITE.erl | 32 +++++++++ 11 files changed, 229 insertions(+), 28 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 32c291eec..dc4a74b43 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -757,7 +757,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) -> %%-------------------------------------------------------------------- -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -disconnect(Session = #{id := Id, s := S0}, ConnInfo) -> +disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) -> S1 = maybe_set_offline_info(S0, Id), S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1), S3 = @@ -767,8 +767,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) -> _ -> S2 end, - S = emqx_persistent_session_ds_state:commit(S3), - {shutdown, Session#{s => S}}. + {S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0), + S = emqx_persistent_session_ds_state:commit(S4), + {shutdown, Session#{s => S, shared_sub_s => SharedSubS}}. -spec terminate(Reason :: term(), session()) -> ok. terminate(_Reason, Session = #{id := Id, s := S}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index f3aaa146e..0274b9b9e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -15,6 +15,7 @@ on_subscribe/3, on_unsubscribe/4, + on_disconnect/2, on_streams_replayed/2, on_info/3, @@ -118,29 +119,20 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> t() ) -> {emqx_persistent_session_ds_state:t(), t()}. on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> - %% TODO - %% Is it sufficient for a report? - Progress = fold_shared_stream_states( - fun(TopicFilter, Stream, SRS, Acc) -> - #srs{it_begin = BeginIt} = SRS, - - StreamProgress = #{ - topic_filter => TopicFilter, - stream => Stream, - iterator => BeginIt, - use_finished => is_use_finished(S, SRS) - }, - [StreamProgress | Acc] - end, - [], - S - ), + Progresses = stream_progresses(S), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( - Agent0, Progress + Agent0, Progresses ), SharedSubS1 = SharedSubS0#{agent => Agent1}, {S, SharedSubS1}. +on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> + S1 = revoke_all_streams(S0), + Progresses = stream_progresses(S1), + Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses), + SharedSubS1 = SharedSubS0#{agent => Agent1}, + {S1, SharedSubS1}. + -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> {emqx_persistent_session_ds_state:t(), t()}. on_info(S, #{agent := Agent0} = SharedSubS0, Info) -> @@ -157,6 +149,30 @@ to_map(_S, _SharedSubS) -> %% Internal functions %%-------------------------------------------------------------------- +stream_progresses(S) -> + fold_shared_stream_states( + fun(TopicFilter, Stream, SRS, Acc) -> + #srs{it_begin = BeginIt} = SRS, + + case is_stream_fully_acked(S, SRS) of + true -> + %% TODO + %% Is it sufficient for a report? + StreamProgress = #{ + topic_filter => TopicFilter, + stream => Stream, + iterator => BeginIt, + use_finished => is_use_finished(S, SRS) + }, + [StreamProgress | Acc]; + false -> + Acc + end + end, + [], + S + ). + fold_shared_subs(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( fun @@ -322,6 +338,15 @@ revoke_stream( end end. +revoke_all_streams(S0) -> + fold_shared_stream_states( + fun(TopicFilter, Stream, _SRS, S) -> + revoke_stream(#{topic_filter => TopicFilter, stream => Stream}, S) + end, + S0, + S0 + ). + -spec to_agent_subscription( emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription() ) -> @@ -339,5 +364,8 @@ agent_opts(#{session_id := SessionId}) -> now_ms() -> erlang:system_time(millisecond). -is_use_finished(S, #srs{unsubscribed = Unsubscribed} = SRS) -> - Unsubscribed andalso emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). +is_use_finished(S, #srs{unsubscribed = Unsubscribed}) -> + Unsubscribed. + +is_stream_fully_acked(S, SRS) -> + emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index 97b38d0f2..72b4fa22d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -44,7 +44,8 @@ -type stream_progress() :: #{ topic_filter := topic_filter(), stream := emqx_ds:stream(), - iterator := emqx_ds:iterator() + iterator := emqx_ds:iterator(), + use_finished := boolean() }. -export_type([ @@ -63,6 +64,7 @@ on_unsubscribe/2, on_stream_progress/2, on_info/2, + on_disconnect/2, renew_streams/1 ]). @@ -81,6 +83,7 @@ -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> {ok, t()} | {error, term()}. -callback on_unsubscribe(t(), topic_filter()) -> t(). +-callback on_disconnect(t(), [stream_progress()]) -> t(). -callback renew_streams(t()) -> {[stream_lease_event()], t()}. -callback on_stream_progress(t(), [stream_progress()]) -> t(). -callback on_info(t(), term()) -> t(). @@ -106,6 +109,10 @@ on_subscribe(Agent, TopicFilter, SubOpts) -> on_unsubscribe(Agent, TopicFilter) -> ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). +-spec on_disconnect(t(), [stream_progress()]) -> t(). +on_disconnect(Agent, StreamProgresses) -> + ?shared_subs_agent:on_disconnect(Agent, StreamProgresses). + -spec renew_streams(t()) -> {[stream_lease_event()], t()}. renew_streams(Agent) -> ?shared_subs_agent:renew_streams(Agent). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl index e158c19e2..5bdae08da 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl @@ -14,6 +14,7 @@ on_unsubscribe/2, on_stream_progress/2, on_info/2, + on_disconnect/1, renew_streams/1 ]). @@ -36,6 +37,9 @@ on_subscribe(_Agent, _TopicFilter, _SubOpts) -> on_unsubscribe(Agent, _TopicFilter) -> Agent. +on_disconnect(Agent) -> + Agent. + renew_streams(Agent) -> {[], Agent}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 11cb011d3..0e8d17614 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -17,6 +17,7 @@ on_unsubscribe/2, on_stream_progress/2, on_info/2, + on_disconnect/2, renew_streams/1 ]). @@ -68,6 +69,19 @@ on_stream_progress(State, StreamProgresses) -> maps:to_list(ProgressesByGroup) ). +on_disconnect(#{groups := Groups0} = State, StreamProgresses) -> + ProgressesByGroup = stream_progresses_by_group(StreamProgresses), + Groups1 = maps:fold( + fun(Group, GroupSM0, GroupsAcc) -> + GroupProgresses = maps:get(Group, ProgressesByGroup, []), + GroupSM1 = emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses), + GroupsAcc#{Group => GroupSM1} + end, + #{}, + Groups0 + ), + State#{groups => Groups1}. + on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) -> ?SLOG(info, #{ msg => leader_lease_streams, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index cd029d8df..3932aa6ce 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -27,7 +27,8 @@ %% API fetch_stream_events/1, - handle_stream_progress/2 + handle_stream_progress/2, + handle_disconnect/2 ]). -export_type([ @@ -72,8 +73,9 @@ -define(connecting, connecting). -define(replaying, replaying). -define(updating, updating). +-define(disconnected, disconnected). --type state() :: ?connecting | ?replaying | ?updating. +-type state() :: ?connecting | ?replaying | ?updating | ?disconnected. -type connecting_data() :: #{}. -type replaying_data() :: #{ @@ -169,6 +171,18 @@ fetch_stream_events( ), {GSM#{stream_lease_events => []}, Events1}. +-spec handle_disconnect(group_sm(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> group_sm(). +handle_disconnect(#{state := ?connecting} = GSM, _StreamProgresses) -> + transition(GSM, ?disconnected, #{}); +handle_disconnect( + #{agent := Agent, state_data := #{leader := Leader, version := Version} = StateData} = GSM, + StreamProgresses +) -> + ok = emqx_ds_shared_sub_proto:agent_disconnect( + Leader, Agent, StreamProgresses, Version + ), + transition(GSM, ?disconnected, StateData). + %%----------------------------------------------------------------------- %% Event Handlers %%----------------------------------------------------------------------- @@ -229,6 +243,12 @@ handle_updating(GSM0) -> ), GSM2. +%%----------------------------------------------------------------------- +%% Disconnected state + +handle_disconnected(GSM) -> + GSM. + %%----------------------------------------------------------------------- %% Common handlers @@ -301,6 +321,10 @@ handle_leader_update_streams( _StreamProgresses ) -> ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_update_streams( + #{state := ?disconnected} = GSM, _VersionOld, _VersionNew, _StreamProgresses +) -> + GSM; handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) -> ?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{ gsm => GSM, @@ -335,6 +359,10 @@ handle_leader_renew_stream_lease( VersionNew ) -> ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_renew_stream_lease( + #{state := ?disconnected} = GSM, _VersionOld, _VersionNew +) -> + GSM; handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) -> ?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{ gsm => GSM, @@ -344,6 +372,8 @@ handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) -> %% Unexpected versions or state transition(GSM, ?connecting, #{}). +-spec handle_stream_progress(group_sm(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> + group_sm(). handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) -> GSM; handle_stream_progress( @@ -376,7 +406,9 @@ handle_stream_progress( ok = emqx_ds_shared_sub_proto:agent_update_stream_states( Leader, Agent, StreamProgresses, PrevVersion, Version ), - ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL). + ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL); +handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) -> + GSM. handle_leader_invalidate(GSM) -> transition(GSM, ?connecting, #{}). @@ -485,7 +517,9 @@ run_enter_callback(#{state := ?connecting} = GSM) -> run_enter_callback(#{state := ?replaying} = GSM) -> handle_replaying(GSM); run_enter_callback(#{state := ?updating} = GSM) -> - handle_updating(GSM). + handle_updating(GSM); +run_enter_callback(#{state := ?disconnected} = GSM) -> + handle_disconnected(GSM). progresses_to_lease_events(StreamProgresses) -> lists:map( diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 379c0278f..2bbdc67d8 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -213,6 +213,19 @@ handle_event( update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew) end), {keep_state, Data1}; +handle_event( + info, + ?agent_disconnect_match(Agent, StreamProgresses, Version), + ?leader_active, + Data0 +) -> + % ?tp(warning, shared_sub_leader_disconnect, #{ + % agent => Agent, version => Version + % }), + Data1 = with_agent(Data0, Agent, fun() -> + disconnect_agent(Data0, Agent, StreamProgresses, Version) + end), + {keep_state, Data1}; %%-------------------------------------------------------------------- %% fallback handle_event(enter, _OldState, _State, _Data) -> @@ -399,6 +412,28 @@ assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) -> ), set_agent_state(Data1, Agent, AgentState). +%%-------------------------------------------------------------------- +%% Disconnect agent gracefully + +disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) -> + case get_agent_state(Data0, Agent) of + #{version := Version} -> + ?tp(warning, shared_sub_leader_disconnect_agent, #{ + agent => Agent, + version => Version + }), + Data1 = update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version), + Data2 = drop_agent(Data1, Agent), + Data2; + _ -> + ?tp(warning, shared_sub_leader_unexpected_disconnect, #{ + agent => Agent, + version => Version + }), + Data1 = drop_agent(Data0, Agent), + Data1 + end. + %%-------------------------------------------------------------------- %% Drop agents that stopped reporting progress @@ -790,6 +825,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) -> #{streams := Streams, revoked_streams := RevokedStreams} = AgentState, AllStreams = Streams ++ RevokedStreams, Data1 = unassign_streams(Data0, AllStreams), + ?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}), Data1#{agents => maps:remove(Agent, Agents)}. invalidate_agent(#{group := Group}, Agent) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index ec0a25f14..53a6693b2 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -16,6 +16,7 @@ agent_connect_leader/4, agent_update_stream_states/4, agent_update_stream_states/5, + agent_disconnect/4, leader_lease_streams/5, leader_renew_stream_lease/3, @@ -124,6 +125,23 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew ). +agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when + ?is_local_leader(ToLeader) +-> + ?tp(warning, shared_sub_proto_msg, #{ + type => agent_disconnect, + to_leader => ToLeader, + from_agent => FromAgent, + stream_progresses => format_streams(StreamProgresses), + version => Version + }), + _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)), + ok; +agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> + emqx_ds_shared_sub_proto_v1:agent_disconnect( + ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version + ). + %% leader -> agent messages -spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index a2cf284f3..f8158c918 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -16,6 +16,7 @@ -define(agent_update_stream_states_msg, agent_update_stream_states). -define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout). -define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout). +-define(agent_disconnect_msg, agent_disconnect). %% Agent messages sent to the leader. %% Leader talks to many agents, `agent` field is used to identify the sender. @@ -64,6 +65,20 @@ agent := Agent }). +-define(agent_disconnect(Agent, StreamStates, Version), #{ + type => ?agent_disconnect_msg, + stream_states => StreamStates, + version => Version, + agent => Agent +}). + +-define(agent_disconnect_match(Agent, StreamStates, Version), #{ + type := ?agent_disconnect_msg, + stream_states := StreamStates, + version := Version, + agent := Agent +}). + %% leader messages, sent from the leader to the agent %% Agent may have several shared subscriptions, so may talk to several leaders %% `group` field is used to identify the leader. diff --git a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl index 01c704cb0..117b34e98 100644 --- a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl +++ b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl @@ -62,6 +62,18 @@ agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, VersionO ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew ]). +-spec agent_disconnect( + node(), + emqx_ds_shared_sub_proto:leader(), + emqx_ds_shared_sub_proto:agent(), + list(emqx_ds_shared_sub_proto:agent_stream_progress()), + emqx_ds_shared_sub_proto:version() +) -> ok. +agent_disconnect(Node, ToLeader, FromAgent, StreamProgresses, Version) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, agent_disconnect, [ + ToLeader, FromAgent, StreamProgresses, Version + ]). + %% leader -> agent messages -spec leader_lease_streams( diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 8bb460967..e9d83d4fb 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -152,6 +152,38 @@ t_stream_revoke(_Config) -> ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnPub). +t_graceful_disconnect(_Config) -> + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr4/topic7/#">>, 1), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr4/topic7/#">>, 1), + + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + {ok, _} = emqtt:publish(ConnPub, <<"topic7/1">>, <<"hello1">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic7/2">>, <<"hello2">>, 1), + + ?assertReceive({publish, #{payload := <<"hello1">>}}, 2_000), + ?assertReceive({publish, #{payload := <<"hello2">>}}, 2_000), + + ?assertWaitEvent( + ok = emqtt:disconnect(ConnShared1), + #{?snk_kind := shared_sub_leader_disconnect_agent}, + 1_000 + ), + + {ok, _} = emqtt:publish(ConnPub, <<"topic7/1">>, <<"hello3">>, 1), + {ok, _} = emqtt:publish(ConnPub, <<"topic7/2">>, <<"hello4">>, 1), + + %% Since the disconnect is graceful, the streams should rebalance quickly, + %% before the timeout. + ?assertReceive({publish, #{payload := <<"hello3">>}}, 2_000), + ?assertReceive({publish, #{payload := <<"hello4">>}}, 2_000), + + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnPub). + t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>),