From 3c451c6ae60cc455b8d9c082866d5fca7ed359c6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Jan 2024 17:02:06 +0100 Subject: [PATCH] test(sessds): Fix unstable tests --- .../emqx_persistent_session_ds_SUITE.erl | 120 +++++++----------- apps/emqx/src/emqx_persistent_session_ds.erl | 34 +++-- apps/emqx/src/emqx_persistent_session_ds.hrl | 7 + .../emqx_persistent_session_ds_gc_worker.erl | 33 +++-- .../src/emqx_persistent_session_ds_state.erl | 24 ++-- ...persistent_session_ds_stream_scheduler.erl | 8 +- .../test/emqx_persistent_session_SUITE.erl | 68 +++++----- changes/ce/feat-12251.en.md | 2 +- 8 files changed, 138 insertions(+), 158 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index f834b8098..40ffe7f32 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -91,7 +91,7 @@ end_per_testcase(_TestCase, _Config) -> ok. %%------------------------------------------------------------------------------ -%% Helper fns +%% Helper functions %%------------------------------------------------------------------------------ cluster(#{n := N} = Opts) -> @@ -147,9 +147,9 @@ start_client(Opts0 = #{}) -> proto_ver => v5, properties => #{'Session-Expiry-Interval' => 300} }, - Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), - ct:pal("starting client with opts:\n ~p", [Opts]), - {ok, Client} = emqtt:start_link(Opts), + Opts = emqx_utils_maps:deep_merge(Defaults, Opts0), + ?tp(notice, "starting client", Opts), + {ok, Client} = emqtt:start_link(maps:to_list(Opts)), unlink(Client), on_exit(fun() -> catch emqtt:stop(Client) end), Client. @@ -186,33 +186,6 @@ list_all_pubranges(Node) -> %% Testcases %%------------------------------------------------------------------------------ -t_non_persistent_session_subscription(_Config) -> - ClientId = atom_to_binary(?FUNCTION_NAME), - SubTopicFilter = <<"t/#">>, - ?check_trace( - #{timetrap => 30_000}, - begin - ?tp(notice, "starting", #{}), - Client = start_client(#{ - clientid => ClientId, - properties => #{'Session-Expiry-Interval' => 0} - }), - {ok, _} = emqtt:connect(Client), - ?tp(notice, "subscribing", #{}), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), - - ok = emqtt:stop(Client), - - ok - end, - fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), - ?assertEqual([], ?of_kind(ds_session_subscription_added, Trace)), - ok - end - ), - ok. - t_session_subscription_idempotency(Config) -> [Node1Spec | _] = ?config(node_specs, Config), [Node1] = ?config(nodes, Config), @@ -222,7 +195,6 @@ t_session_subscription_idempotency(Config) -> ?check_trace( #{timetrap => 30_000}, begin - #{timetrap => 20_000}, ?force_ordering( #{?snk_kind := persistent_session_ds_subscription_added}, _NEvents0 = 1, @@ -553,14 +525,14 @@ t_session_gc(Config) -> ?check_trace( #{timetrap => 30_000}, begin - ClientId0 = <<"session_gc0">>, - Client0 = StartClient(ClientId0, Port1, 30), - ClientId1 = <<"session_gc1">>, - Client1 = StartClient(ClientId1, Port2, 1), + Client1 = StartClient(ClientId1, Port1, 30), ClientId2 = <<"session_gc2">>, - Client2 = StartClient(ClientId2, Port3, 1), + Client2 = StartClient(ClientId2, Port2, 1), + + ClientId3 = <<"session_gc3">>, + Client3 = StartClient(ClientId3, Port3, 1), lists:foreach( fun(Client) -> @@ -570,52 +542,41 @@ t_session_gc(Config) -> {ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1), ok end, - [Client0, Client1, Client2] + [Client1, Client2, Client3] ), %% Clients are still alive; no session is garbage collected. - Res0 = ?block_until( - #{ - ?snk_kind := ds_session_gc, - ?snk_span := {complete, _}, - ?snk_meta := #{node := N} - } when - N =/= node(), - 3 * GCInterval + 1_000 - ), - ?assertMatch({ok, _}, Res0), - {ok, #{?snk_meta := #{time := T0}}} = Res0, - ?assertMatch([_, _, _], list_all_sessions(Node1), sessions), - ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions), - - %% Now we disconnect 2 of them; only those should be GC'ed. - ?assertMatch( - {ok, {ok, _}}, - ?wait_async_action( - emqtt:stop(Client1), - #{?snk_kind := terminate}, - 1_000 - ) - ), - ct:pal("disconnected client1"), - ?assertMatch( - {ok, {ok, _}}, - ?wait_async_action( - emqtt:stop(Client2), - #{?snk_kind := terminate}, - 1_000 - ) - ), - ct:pal("disconnected client2"), ?assertMatch( {ok, _}, ?block_until( #{ - ?snk_kind := ds_session_gc_cleaned, - session_id := ClientId1 - } + ?snk_kind := ds_session_gc, + ?snk_span := {complete, _}, + ?snk_meta := #{node := N} + } when N =/= node() ) ), + ?assertMatch([_, _, _], list_all_sessions(Node1), sessions), + ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions), + + %% Now we disconnect 2 of them; only those should be GC'ed. + + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqtt:stop(Client2), + #{?snk_kind := terminate} + ) + ), + ?tp(notice, "disconnected client1", #{}), + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqtt:stop(Client3), + #{?snk_kind := terminate} + ) + ), + ?tp(notice, "disconnected client2", #{}), ?assertMatch( {ok, _}, ?block_until( @@ -625,7 +586,16 @@ t_session_gc(Config) -> } ) ), - ?assertMatch([_], list_all_sessions(Node1), sessions), + ?assertMatch( + {ok, _}, + ?block_until( + #{ + ?snk_kind := ds_session_gc_cleaned, + session_id := ClientId3 + } + ) + ), + ?assertMatch([ClientId1], list_all_sessions(Node1), sessions), ?assertMatch([_], list_all_subscriptions(Node1), subscriptions), ok end, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9f77c4219..6f32b1549 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -79,7 +79,7 @@ do_ensure_all_iterators_closed/1 ]). --export([print_session/1]). +-export([print_session/1, seqno_diff/4]). -ifdef(TEST). -export([ @@ -152,8 +152,7 @@ inflight_cnt, inflight_max, mqueue_len, - mqueue_dropped, - awaiting_rel_cnt + mqueue_dropped ]). %% @@ -233,8 +232,8 @@ info(mqueue_dropped, _Session) -> %% PacketId; % info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> % AwaitingRel; -info(awaiting_rel_cnt, #{s := S}) -> - seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); +%% info(awaiting_rel_cnt, #{s := S}) -> +%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); info(awaiting_rel_max, #{props := Conf}) -> maps:get(max_awaiting_rel, Conf); info(await_rel_timeout, #{props := Conf}) -> @@ -271,6 +270,8 @@ subscribe( ) -> case subs_lookup(TopicFilter, S0) of undefined -> + %% TODO: max subscriptions + %% N.B.: we chose to update the router before adding the %% subscription to the session/iterator table. The %% reasoning for this is as follows: @@ -511,16 +512,21 @@ replay_batch(Srs0, Session, ClientInfo) -> -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. disconnect(Session = #{s := S0}, ConnInfo) -> - OldConnInfo = emqx_persistent_session_ds_state:get_conninfo(S0), - NewConnInfo = maps:merge(OldConnInfo, maps:with([expiry_interval], ConnInfo)), S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), - S2 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S1), + S2 = + case ConnInfo of + #{expiry_interval := EI} when is_number(EI) -> + emqx_persistent_session_ds_state:set_expiry_interval(EI, S1); + _ -> + S1 + end, S = emqx_persistent_session_ds_state:commit(S2), {shutdown, Session#{s => S}}. -spec terminate(Reason :: term(), session()) -> ok. -terminate(_Reason, _Session = #{s := S}) -> +terminate(_Reason, _Session = #{id := Id, s := S}) -> _ = emqx_persistent_session_ds_state:commit(S), + ?tp(debug, persistent_session_ds_terminate, #{id => Id}), ok. %%-------------------------------------------------------------------- @@ -558,7 +564,7 @@ session_open(SessionId, NewConnInfo) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> - EI = expiry_interval(emqx_persistent_session_ds_state:get_conninfo(S0)), + EI = emqx_persistent_session_ds_state:get_expiry_interval(S0), LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0), case NowMS >= LastAliveAt + EI of true -> @@ -567,7 +573,7 @@ session_open(SessionId, NewConnInfo) -> false -> ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}), %% New connection being established - S1 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S0), + S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0), S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1), S = emqx_persistent_session_ds_state:commit(S2), Inflight = emqx_persistent_session_ds_inflight:new( @@ -587,9 +593,10 @@ session_open(SessionId, NewConnInfo) -> -spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) -> session(). session_ensure_new(Id, ConnInfo, Conf) -> + ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), - S1 = emqx_persistent_session_ds_state:set_conninfo(ConnInfo, S0), + S1 = emqx_persistent_session_ds_state:set_expiry_interval(expiry_interval(ConnInfo), S0), S2 = bump_last_alive(S1), S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2), S4 = lists:foldl( @@ -970,8 +977,7 @@ inc_seqno(Qos, SeqNo) -> NextSeqno end. -%% Note: we use the least significant bit to store the QoS. Even -%% packet IDs are QoS1, odd packet IDs are QoS2. +%% Note: we use the most significant bit to store the QoS. seqno_to_packet_id(?QOS_1, SeqNo) -> SeqNo band ?PACKET_ID_MASK; seqno_to_packet_id(?QOS_2, SeqNo) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index f097b2c6e..fa4bfacf1 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -65,4 +65,11 @@ last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno() }). +%% Session metadata keys: +-define(created_at, created_at). +-define(last_alive_at, last_alive_at). +-define(expiry_interval, expiry_interval). +%% Unique integer used to create unique identities +-define(last_id, last_id). + -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl index 66f5c9b4e..a4d1fe638 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -69,7 +69,7 @@ handle_info(_Info, State) -> {noreply, State}. %%-------------------------------------------------------------------------------- -%% Internal fns +%% Internal functions %%-------------------------------------------------------------------------------- ensure_gc_timer() -> @@ -116,22 +116,21 @@ gc_loop(MinLastAlive, It0) -> {[], _It} -> ok; {Sessions, It} -> - [ - do_gc(SessionId, MinLastAlive, LastAliveAt, EI) - || {SessionId, #{last_alive_at := LastAliveAt, conninfo := #{expiry_interval := EI}}} <- - Sessions - ], + [do_gc(SessionId, MinLastAlive, Metadata) || {SessionId, Metadata} <- Sessions], gc_loop(MinLastAlive, It) end. -do_gc(SessionId, MinLastAlive, LastAliveAt, EI) when LastAliveAt + EI < MinLastAlive -> - emqx_persistent_session_ds:destroy_session(SessionId), - ?tp(debug, ds_session_gc_cleaned, #{ - session_id => SessionId, - last_alive_at => LastAliveAt, - expiry_interval => EI, - min_last_alive => MinLastAlive - }), - ok; -do_gc(_SessionId, _MinLastAliveAt, _LastAliveAt, _EI) -> - ok. +do_gc(SessionId, MinLastAlive, Metadata) -> + #{?last_alive_at := LastAliveAt, ?expiry_interval := EI} = Metadata, + case LastAliveAt + EI < MinLastAlive of + true -> + emqx_persistent_session_ds:destroy_session(SessionId), + ?tp(debug, ds_session_gc_cleaned, #{ + session_id => SessionId, + last_alive_at => LastAliveAt, + expiry_interval => EI, + min_last_alive => MinLastAlive + }); + false -> + ok + end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index a4b349c9e..32d661354 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -29,7 +29,7 @@ -export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]). -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). --export([get_conninfo/1, set_conninfo/2]). +-export([get_expiry_interval/1, set_expiry_interval/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_seqno/2, put_seqno/3]). @@ -81,18 +81,11 @@ dirty :: #{K => dirty | del} }. -%% Session metadata: --define(created_at, created_at). --define(last_alive_at, last_alive_at). --define(conninfo, conninfo). -%% Unique integer used to create unique identities --define(last_id, last_id). - -type metadata() :: #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), - ?conninfo => emqx_types:conninfo(), + ?expiry_interval => emqx_types:conninfo(), ?last_id => integer() }. @@ -122,6 +115,7 @@ -define(rank_tab, emqx_ds_session_ranks). -define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). +%% Enable this flag if you suspect some code breaks the sequence: -ifndef(CHECK_SEQNO). -define(set_dirty, dirty => true). -define(unset_dirty, dirty => false). @@ -268,13 +262,13 @@ get_last_alive_at(Rec) -> set_last_alive_at(Val, Rec) -> set_meta(?last_alive_at, Val, Rec). --spec get_conninfo(t()) -> emqx_types:conninfo() | undefined. -get_conninfo(Rec) -> - get_meta(?conninfo, Rec). +-spec get_expiry_interval(t()) -> non_neg_integer() | undefined. +get_expiry_interval(Rec) -> + get_meta(?expiry_interval, Rec). --spec set_conninfo(emqx_types:conninfo(), t()) -> t(). -set_conninfo(Val, Rec) -> - set_meta(?conninfo, Val, Rec). +-spec set_expiry_interval(non_neg_integer(), t()) -> t(). +set_expiry_interval(Val, Rec) -> + set_meta(?expiry_interval, Val, Rec). -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 5df56843f..b321b324b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -201,12 +201,10 @@ remove_fully_replayed_streams(S0) -> Groups = emqx_persistent_session_ds_state:fold_streams( fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) -> Key = {SubId, RankX}, - case - {maps:get(Key, Acc, undefined), is_fully_replayed(CommQos1, CommQos2, StreamState)} - of - {undefined, true} -> + case is_fully_replayed(CommQos1, CommQos2, StreamState) of + true when is_map_key(Key, Acc) -> Acc#{Key => {true, RankY}}; - {_, false} -> + true -> Acc#{Key => false}; _ -> Acc diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 3fc76d4b5..730fdd297 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -74,6 +74,7 @@ init_per_group(persistence_enabled, Config) -> {emqx_config, "session_persistence {\n" " enable = true\n" + " last_alive_update_interval = 100ms\n" " renew_streams_interval = 100ms\n" "}"}, {persistence, ds} @@ -534,42 +535,47 @@ t_process_dies_session_expires(Config) -> %% Emulate an error in the connect process, %% or that the node of the process goes down. %% A persistent session should eventually expire. - ConnFun = ?config(conn_fun, Config), - ClientId = ?config(client_id, Config), - Topic = ?config(topic, Config), - STopic = ?config(stopic, Config), - Payload = <<"test">>, - {ok, Client1} = emqtt:start_link([ - {proto_ver, v5}, - {clientid, ClientId}, - {properties, #{'Session-Expiry-Interval' => 1}}, - {clean_start, true} - | Config - ]), - {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), - ok = emqtt:disconnect(Client1), + ?check_trace( + begin + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload = <<"test">>, + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 1}}, + {clean_start, true} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + ok = emqtt:disconnect(Client1), - maybe_kill_connection_process(ClientId, Config), + maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, Payload), + ok = publish(Topic, Payload), - timer:sleep(2000), + timer:sleep(1500), - {ok, Client2} = emqtt:start_link([ - {proto_ver, v5}, - {clientid, ClientId}, - {properties, #{'Session-Expiry-Interval' => 30}}, - {clean_start, false} - | Config - ]), - {ok, _} = emqtt:ConnFun(Client2), - ?assertEqual(0, client_info(session_present, Client2)), + {ok, Client2} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertEqual(0, client_info(session_present, Client2)), - %% We should not receive the pending message - ?assertEqual([], receive_messages(1)), + %% We should not receive the pending message + ?assertEqual([], receive_messages(1)), - emqtt:disconnect(Client2). + emqtt:disconnect(Client2) + end, + [] + ). t_publish_while_client_is_gone_qos1(Config) -> %% A persistent session should receive messages in its @@ -644,7 +650,7 @@ t_publish_many_while_client_is_gone_qos1(Config) -> #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1}, #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1}, #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M7">>, qos = 1} + #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1} ], ok = publish_many(Pubs1), NPubs1 = length(Pubs1), diff --git a/changes/ce/feat-12251.en.md b/changes/ce/feat-12251.en.md index a206288b5..b35c44a01 100644 --- a/changes/ce/feat-12251.en.md +++ b/changes/ce/feat-12251.en.md @@ -4,4 +4,4 @@ Reduce RAM usage and frequency of database requests. - Introduce dirty session state to avoid frequent mria transactions - Introduce an intermediate buffer for the persistent messages - Use separate tracks of PacketIds for QoS1 and QoS2 messages -- Limit the number of continuous ranges of infligtht messages to one per stream +- Limit the number of continuous ranges of inflight messages to one per stream