From 09c4e40511aa5591cf231e90f075d57b35e2eec1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Nov 2023 11:48:44 -0300 Subject: [PATCH] refactor(ds): rename `disconnected_at` to `last_alive_at`, add more assertions --- .../emqx_persistent_session_ds_SUITE.erl | 13 ++++++++ apps/emqx/src/emqx_persistent_session_ds.erl | 32 +++++++++---------- apps/emqx/src/emqx_persistent_session_ds.hrl | 2 +- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 7937c2fd4..05c1eb8f2 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -428,9 +429,13 @@ do_t_session_expiration(_Config, Opts) -> CommonParams = #{proto_ver => v5, clientid => ClientId}, ?check_trace( begin + Topic = <<"some/topic">>, Params0 = maps:merge(CommonParams, FirstConn), Client0 = start_client(Params0), {ok, _} = emqtt:connect(Client0), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2), + Subs0 = emqx_persistent_session_ds:list_all_subscriptions(), + ?assertEqual(1, map_size(Subs0), #{subs => Subs0}), Info0 = maps:from_list(emqtt:info(Client0)), ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}), emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn), @@ -440,6 +445,8 @@ do_t_session_expiration(_Config, Opts) -> {ok, _} = emqtt:connect(Client1), Info1 = maps:from_list(emqtt:info(Client1)), ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}), + Subs1 = emqtt:subscriptions(Client1), + ?assertEqual([], Subs1), emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn), ct:sleep(1_500), @@ -449,6 +456,12 @@ do_t_session_expiration(_Config, Opts) -> {ok, _} = emqtt:connect(Client2), Info2 = maps:from_list(emqtt:info(Client2)), ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}), + Subs2 = emqtt:subscriptions(Client2), + ?assertEqual([], Subs2), + emqtt:publish(Client2, Topic, <<"payload">>), + ?assertNotReceive({publish, #{topic := Topic}}), + %% ensure subscriptions are absent from table. + ?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()), emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), ok diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index e0be4eefc..1429d6e97 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -98,8 +98,8 @@ id := id(), %% When the session was created created_at := timestamp(), - %% When the client last disconnected - disconnected_at := timestamp() | never, + %% When the client was last considered alive + last_alive_at := timestamp(), %% Client’s Subscriptions. subscriptions := #{topic_filter() => subscription()}, %% Inflight messages @@ -126,10 +126,10 @@ next_pkt_id ]). --define(IS_EXPIRED(NOW_MS, DISCONNECTED_AT, EI), - (is_number(DisconnectedAt) andalso +-define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI), + (is_number(LAST_ALIVE_AT) andalso is_number(EI) andalso - (NowMS >= DisconnectedAt + EI)) + (NOW_MS >= LAST_ALIVE_AT + EI)) ). -export_type([id/0]). @@ -408,7 +408,7 @@ replay(_ClientInfo, [], Session = #{inflight := Inflight0}) -> -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. disconnect(Session0, ConnInfo) -> - Session = session_set_disconnected_at_trans(Session0, ConnInfo, now_ms()), + Session = session_set_last_alive_at_trans(Session0, ConnInfo, now_ms()), {shutdown, Session}. -spec terminate(Reason :: term(), session()) -> ok. @@ -544,16 +544,16 @@ session_open(SessionId, NewConnInfo) -> NowMS = now_ms(), transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of - [Record0 = #session{disconnected_at = DisconnectedAt, conninfo = ConnInfo}] -> + [Record0 = #session{last_alive_at = LastAliveAt, conninfo = ConnInfo}] -> EI = expiry_interval(ConnInfo), - case ?IS_EXPIRED(NowMS, DisconnectedAt, EI) of + case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of true -> session_drop(SessionId), false; false -> %% new connection being established Record1 = Record0#session{conninfo = NewConnInfo}, - Record = session_set_disconnected_at(Record1, never), + Record = session_set_last_alive_at(Record1, never), Session = export_session(Record), DSSubs = session_read_subscriptions(SessionId), Subscriptions = export_subscriptions(DSSubs), @@ -585,30 +585,30 @@ session_create(SessionId, ConnInfo, Props) -> Session = #session{ id = SessionId, created_at = now_ms(), - disconnected_at = never, + last_alive_at = now_ms(), conninfo = ConnInfo, props = Props }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. -session_set_disconnected_at_trans(Session, NewConnInfo, DisconnectedAt) -> +session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) -> #{id := SessionId} = Session, transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of [#session{} = SessionRecord0] -> SessionRecord = SessionRecord0#session{conninfo = NewConnInfo}, - _ = session_set_disconnected_at(SessionRecord, DisconnectedAt), + _ = session_set_last_alive_at(SessionRecord, LastAliveAt), ok; _ -> %% log and crash? ok end end), - Session#{conninfo := NewConnInfo, disconnected_at := DisconnectedAt}. + Session#{conninfo := NewConnInfo, last_alive_at := LastAliveAt}. -session_set_disconnected_at(SessionRecord0, DisconnectedAt) -> - SessionRecord = SessionRecord0#session{disconnected_at = DisconnectedAt}, +session_set_last_alive_at(SessionRecord0, LastAliveAt) -> + SessionRecord = SessionRecord0#session{last_alive_at = LastAliveAt}, ok = mnesia:write(?SESSION_TAB, SessionRecord, write), SessionRecord. @@ -849,7 +849,7 @@ export_subscriptions(DSSubs) -> ). export_session(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, disconnected_at, conninfo, props], #{}). + export_record(Record, #session.id, [id, created_at, last_alive_at, conninfo, props], #{}). export_subscription(#ds_sub{} = Record) -> export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index cbdc00c09..375bea97f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -73,7 +73,7 @@ id :: emqx_persistent_session_ds:id(), %% creation time created_at :: _Millisecond :: non_neg_integer(), - disconnected_at = never :: _Millisecond :: non_neg_integer() | never, + last_alive_at :: _Millisecond :: non_neg_integer(), conninfo :: emqx_types:conninfo(), %% for future usage props = #{} :: map()