diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 56246e743..7937c2fd4 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -221,9 +221,10 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), + ConnInfo = #{}, ?assertMatch( #{subscriptions := #{SubTopicFilter := #{}}}, - erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) + erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) ) end ), @@ -294,9 +295,10 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), + ConnInfo = #{}, ?assertMatch( #{subscriptions := Subs = #{}} when map_size(Subs) =:= 0, - erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) + erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) ), ok end @@ -387,3 +389,70 @@ do_t_session_discard(Params) -> end ), ok. + +t_session_expiration1(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Opts = #{ + clientid => ClientId, + sequence => [ + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 1}}, #{}}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}} + ] + }, + do_t_session_expiration(Config, Opts). + +t_session_expiration2(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Opts = #{ + clientid => ClientId, + sequence => [ + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{ + 'Session-Expiry-Interval' => 1 + }}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}} + ] + }, + do_t_session_expiration(Config, Opts). + +do_t_session_expiration(_Config, Opts) -> + #{ + clientid := ClientId, + sequence := [ + {FirstConn, FirstDisconn}, + {SecondConn, SecondDisconn}, + {ThirdConn, ThirdDisconn} + ] + } = Opts, + CommonParams = #{proto_ver => v5, clientid => ClientId}, + ?check_trace( + begin + Params0 = maps:merge(CommonParams, FirstConn), + Client0 = start_client(Params0), + {ok, _} = emqtt:connect(Client0), + Info0 = maps:from_list(emqtt:info(Client0)), + ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}), + emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn), + + Params1 = maps:merge(CommonParams, SecondConn), + Client1 = start_client(Params1), + {ok, _} = emqtt:connect(Client1), + Info1 = maps:from_list(emqtt:info(Client1)), + ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}), + emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn), + + ct:sleep(1_500), + + Params2 = maps:merge(CommonParams, ThirdConn), + Client2 = start_client(Params2), + {ok, _} = emqtt:connect(Client2), + Info2 = maps:from_list(emqtt:info(Client2)), + ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}), + emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 306341700..dd519568f 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1204,12 +1204,13 @@ handle_info( #channel{ conn_state = ConnState, clientinfo = ClientInfo, + conninfo = ConnInfo, session = Session } ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - {Intent, Session1} = emqx_session:disconnect(ClientInfo, Session), + {Intent, Session1} = emqx_session:disconnect(ClientInfo, ConnInfo, Session), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), Channel2 = Channel1#channel{session = Session1}, case maybe_shutdown(Reason, Intent, Channel2) of @@ -1321,7 +1322,8 @@ handle_timeout( {ok, Replies, NSession} -> handle_out(publish, Replies, Channel#channel{session = NSession}) end; -handle_timeout(_TRef, expire_session, Channel) -> +handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> + ok = emqx_session:destroy(Session), shutdown(expired, Channel); handle_timeout( _TRef, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 76b54e34a..3d38d5e60 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -56,7 +56,7 @@ deliver/3, replay/3, handle_timeout/3, - disconnect/1, + disconnect/2, terminate/2 ]). @@ -74,7 +74,7 @@ -ifdef(TEST). -export([ - session_open/1, + session_open/2, list_all_sessions/0, list_all_subscriptions/0, list_all_streams/0, @@ -98,19 +98,22 @@ id := id(), %% When the session was created created_at := timestamp(), - %% When the session should expire - expires_at := timestamp() | never, + %% When the client last disconnected + disconnected_at := timestamp() | never, %% Client’s Subscriptions. subscriptions := #{topic_filter() => subscription()}, %% Inflight messages inflight := emqx_persistent_message_ds_replayer:inflight(), %% Receive maximum receive_maximum := pos_integer(), + %% Connection Info + conninfo := emqx_types:conninfo(), %% props := map() }. -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). +-type millisecond() :: non_neg_integer(). -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). @@ -123,6 +126,12 @@ next_pkt_id ]). +-define(IS_EXPIRED(NOW_MS, DISCONNECTED_AT, EI), + (is_number(DisconnectedAt) andalso + is_number(EI) andalso + (NowMS >= DisconnectedAt + EI)) +). + -export_type([id/0]). %% @@ -146,7 +155,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case maps:get(clean_start, ConnInfo, false) of false -> - case session_open(ClientID) of + case session_open(ClientID, ConnInfo) of Session0 = #{} -> ensure_timers(), ReceiveMaximum = receive_maximum(ConnInfo), @@ -161,9 +170,13 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> end. ensure_session(ClientID, ConnInfo, Conf) -> - Session = session_ensure_new(ClientID, Conf), + Session = session_ensure_new(ClientID, ConnInfo, Conf), ReceiveMaximum = receive_maximum(ConnInfo), - Session#{subscriptions => #{}, receive_maximum => ReceiveMaximum}. + Session#{ + conninfo => ConnInfo, + receive_maximum => ReceiveMaximum, + subscriptions => #{} + }. -spec destroy(session() | clientinfo()) -> ok. destroy(#{id := ClientID}) -> @@ -399,8 +412,9 @@ replay(_ClientInfo, [], Session = #{inflight := Inflight0}) -> %%-------------------------------------------------------------------- --spec disconnect(session()) -> {shutdown, session()}. -disconnect(Session = #{}) -> +-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. +disconnect(Session0, ConnInfo) -> + Session = session_set_disconnected_at_trans(Session0, ConnInfo, now_ms()), {shutdown, Session}. -spec terminate(Reason :: term(), session()) -> ok. @@ -530,47 +544,80 @@ storage() -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(id()) -> +-spec session_open(id(), emqx_types:conninfo()) -> session() | false. -session_open(SessionId) -> - ro_transaction(fun() -> +session_open(SessionId, NewConnInfo) -> + NowMS = now_ms(), + transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of - [Record = #session{}] -> - Session = export_session(Record), - DSSubs = session_read_subscriptions(SessionId), - Subscriptions = export_subscriptions(DSSubs), - Inflight = emqx_persistent_message_ds_replayer:open(SessionId), - Session#{ - subscriptions => Subscriptions, - inflight => Inflight - }; - [] -> + [Record0 = #session{disconnected_at = DisconnectedAt, conninfo = ConnInfo}] -> + EI = expiry_interval(ConnInfo), + case ?IS_EXPIRED(NowMS, DisconnectedAt, EI) of + true -> + %% Should we drop the session now, or leave it to session GC? + false; + false -> + %% new connection being established + Record1 = Record0#session{conninfo = NewConnInfo}, + Record = session_set_disconnected_at(Record1, never), + Session = export_session(Record), + DSSubs = session_read_subscriptions(SessionId), + Subscriptions = export_subscriptions(DSSubs), + Inflight = emqx_persistent_message_ds_replayer:open(SessionId), + Session#{ + conninfo => NewConnInfo, + inflight => Inflight, + subscriptions => Subscriptions + } + end; + _ -> false end end). --spec session_ensure_new(id(), _Props :: map()) -> +-spec session_ensure_new(id(), emqx_types:conninfo(), _Props :: map()) -> session(). -session_ensure_new(SessionId, Props) -> +session_ensure_new(SessionId, ConnInfo, Props) -> transaction(fun() -> ok = session_drop_subscriptions(SessionId), - Session = export_session(session_create(SessionId, Props)), + Session = export_session(session_create(SessionId, ConnInfo, Props)), Session#{ subscriptions => #{}, inflight => emqx_persistent_message_ds_replayer:new() } end). -session_create(SessionId, Props) -> +session_create(SessionId, ConnInfo, Props) -> Session = #session{ id = SessionId, - created_at = erlang:system_time(millisecond), - expires_at = never, + created_at = now_ms(), + disconnected_at = never, + conninfo = ConnInfo, props = Props }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. +session_set_disconnected_at_trans(Session, NewConnInfo, DisconnectedAt) -> + #{id := SessionId} = Session, + transaction(fun() -> + case mnesia:read(?SESSION_TAB, SessionId, write) of + [#session{} = SessionRecord0] -> + SessionRecord = SessionRecord0#session{conninfo = NewConnInfo}, + _ = session_set_disconnected_at(SessionRecord, DisconnectedAt), + ok; + _ -> + %% log and crash? + ok + end + end), + Session#{conninfo := NewConnInfo, disconnected_at := DisconnectedAt}. + +session_set_disconnected_at(SessionRecord0, DisconnectedAt) -> + SessionRecord = SessionRecord0#session{disconnected_at = DisconnectedAt}, + ok = mnesia:write(?SESSION_TAB, SessionRecord, write), + SessionRecord. + %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(id()) -> ok. @@ -673,7 +720,7 @@ session_read_pubranges(DSSessionId, LockKind) -> new_subscription_id(DSSessionId, TopicFilter) -> %% Note: here we use _milliseconds_ to match with the timestamp %% field of `#message' record. - NowMS = erlang:system_time(millisecond), + NowMS = now_ms(), DSSubId = {DSSessionId, TopicFilter}, {DSSubId, NowMS}. @@ -681,6 +728,9 @@ new_subscription_id(DSSessionId, TopicFilter) -> subscription_id_to_topic_filter({_DSSessionId, TopicFilter}) -> TopicFilter. +now_ms() -> + erlang:system_time(millisecond). + %%-------------------------------------------------------------------- %% RPC targets (v1) %%-------------------------------------------------------------------- @@ -800,7 +850,7 @@ export_subscriptions(DSSubs) -> ). export_session(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, expires_at, props], #{}). + export_record(Record, #session.id, [id, created_at, disconnected_at, conninfo, props], #{}). export_subscription(#ds_sub{} = Record) -> export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). @@ -832,11 +882,16 @@ receive_maximum(ConnInfo) -> %% indicates that it's optional. maps:get(receive_maximum, ConnInfo, 65_535). +-spec expiry_interval(conninfo()) -> millisecond(). +expiry_interval(ConnInfo) -> + maps:get(expiry_interval, ConnInfo, 0). + -ifdef(TEST). list_all_sessions() -> DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), + ConnInfo = #{}, Sessions = lists:map( - fun(SessionID) -> {SessionID, session_open(SessionID)} end, + fun(SessionID) -> {SessionID, session_open(SessionID, ConnInfo)} end, DSSessionIds ), maps:from_list(Sessions). diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 653ac444a..cbdc00c09 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -73,7 +73,8 @@ id :: emqx_persistent_session_ds:id(), %% creation time created_at :: _Millisecond :: non_neg_integer(), - expires_at = never :: _Millisecond :: non_neg_integer() | never, + disconnected_at = never :: _Millisecond :: non_neg_integer() | never, + conninfo :: emqx_types:conninfo(), %% for future usage props = #{} :: map() }). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 64ef2e30d..108e8ec09 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -84,7 +84,7 @@ -export([ deliver/3, handle_timeout/3, - disconnect/2, + disconnect/3, terminate/3 ]). @@ -503,10 +503,10 @@ cancel_timer(Name, Timers) -> %%-------------------------------------------------------------------- --spec disconnect(clientinfo(), t()) -> +-spec disconnect(clientinfo(), eqmx_types:conninfo(), t()) -> {idle | shutdown, t()}. -disconnect(_ClientInfo, Session) -> - ?IMPL(Session):disconnect(Session). +disconnect(_ClientInfo, ConnInfo, Session) -> + ?IMPL(Session):disconnect(Session, ConnInfo). -spec terminate(clientinfo(), Reason :: term(), t()) -> ok. diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index d609435c0..178c71e12 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -87,7 +87,7 @@ deliver/3, replay/3, handle_timeout/3, - disconnect/1, + disconnect/2, terminate/2 ]). @@ -725,8 +725,8 @@ append(L1, L2) -> L1 ++ L2. %%-------------------------------------------------------------------- --spec disconnect(session()) -> {idle, session()}. -disconnect(Session = #session{}) -> +-spec disconnect(session(), emqx_types:conninfo()) -> {idle, session()}. +disconnect(Session = #session{}, _ConnInfo) -> % TODO: isolate expiry timer / timeout handling here? {idle, Session}. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 1be929c7f..b4946b7d3 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -347,8 +347,6 @@ t_connect_discards_existing_client(Config) -> end. %% [MQTT-3.1.2-23] -t_connect_session_expiry_interval(init, Config) -> skip_ds_tc(Config); -t_connect_session_expiry_interval('end', _Config) -> ok. t_connect_session_expiry_interval(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), @@ -356,6 +354,45 @@ t_connect_session_expiry_interval(Config) -> Payload = <<"test message">>, ClientId = ?config(client_id, Config), + {ok, Client1} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client1, STopic, ?QOS_1), + ok = emqtt:disconnect(Client1), + + maybe_kill_connection_process(ClientId, Config), + + publish(Topic, Payload, ?QOS_1), + + {ok, Client2} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + [Msg | _] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, ?QOS_1}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client2). + +%% [MQTT-3.1.2-23] +%% TODO: un-skip after QoS 2 support is implemented in DS. +t_connect_session_expiry_interval_qos2(init, Config) -> skip_ds_tc(Config); +t_connect_session_expiry_interval_qos2('end', _Config) -> ok. +t_connect_session_expiry_interval_qos2(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload = <<"test message">>, + ClientId = ?config(client_id, Config), + {ok, Client1} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5},