test(sessds): Fix unstable tests

This commit is contained in:
ieQu1 2024-01-16 17:02:06 +01:00
parent 2d23212792
commit 3c451c6ae6
8 changed files with 138 additions and 158 deletions

View File

@ -91,7 +91,7 @@ end_per_testcase(_TestCase, _Config) ->
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cluster(#{n := N} = Opts) -> cluster(#{n := N} = Opts) ->
@ -147,9 +147,9 @@ start_client(Opts0 = #{}) ->
proto_ver => v5, proto_ver => v5,
properties => #{'Session-Expiry-Interval' => 300} properties => #{'Session-Expiry-Interval' => 300}
}, },
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), Opts = emqx_utils_maps:deep_merge(Defaults, Opts0),
ct:pal("starting client with opts:\n ~p", [Opts]), ?tp(notice, "starting client", Opts),
{ok, Client} = emqtt:start_link(Opts), {ok, Client} = emqtt:start_link(maps:to_list(Opts)),
unlink(Client), unlink(Client),
on_exit(fun() -> catch emqtt:stop(Client) end), on_exit(fun() -> catch emqtt:stop(Client) end),
Client. Client.
@ -186,33 +186,6 @@ list_all_pubranges(Node) ->
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_non_persistent_session_subscription(_Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
SubTopicFilter = <<"t/#">>,
?check_trace(
#{timetrap => 30_000},
begin
?tp(notice, "starting", #{}),
Client = start_client(#{
clientid => ClientId,
properties => #{'Session-Expiry-Interval' => 0}
}),
{ok, _} = emqtt:connect(Client),
?tp(notice, "subscribing", #{}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
ok = emqtt:stop(Client),
ok
end,
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
?assertEqual([], ?of_kind(ds_session_subscription_added, Trace)),
ok
end
),
ok.
t_session_subscription_idempotency(Config) -> t_session_subscription_idempotency(Config) ->
[Node1Spec | _] = ?config(node_specs, Config), [Node1Spec | _] = ?config(node_specs, Config),
[Node1] = ?config(nodes, Config), [Node1] = ?config(nodes, Config),
@ -222,7 +195,6 @@ t_session_subscription_idempotency(Config) ->
?check_trace( ?check_trace(
#{timetrap => 30_000}, #{timetrap => 30_000},
begin begin
#{timetrap => 20_000},
?force_ordering( ?force_ordering(
#{?snk_kind := persistent_session_ds_subscription_added}, #{?snk_kind := persistent_session_ds_subscription_added},
_NEvents0 = 1, _NEvents0 = 1,
@ -553,14 +525,14 @@ t_session_gc(Config) ->
?check_trace( ?check_trace(
#{timetrap => 30_000}, #{timetrap => 30_000},
begin begin
ClientId0 = <<"session_gc0">>,
Client0 = StartClient(ClientId0, Port1, 30),
ClientId1 = <<"session_gc1">>, ClientId1 = <<"session_gc1">>,
Client1 = StartClient(ClientId1, Port2, 1), Client1 = StartClient(ClientId1, Port1, 30),
ClientId2 = <<"session_gc2">>, ClientId2 = <<"session_gc2">>,
Client2 = StartClient(ClientId2, Port3, 1), Client2 = StartClient(ClientId2, Port2, 1),
ClientId3 = <<"session_gc3">>,
Client3 = StartClient(ClientId3, Port3, 1),
lists:foreach( lists:foreach(
fun(Client) -> fun(Client) ->
@ -570,52 +542,41 @@ t_session_gc(Config) ->
{ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1), {ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1),
ok ok
end, end,
[Client0, Client1, Client2] [Client1, Client2, Client3]
), ),
%% Clients are still alive; no session is garbage collected. %% Clients are still alive; no session is garbage collected.
Res0 = ?block_until(
#{
?snk_kind := ds_session_gc,
?snk_span := {complete, _},
?snk_meta := #{node := N}
} when
N =/= node(),
3 * GCInterval + 1_000
),
?assertMatch({ok, _}, Res0),
{ok, #{?snk_meta := #{time := T0}}} = Res0,
?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
%% Now we disconnect 2 of them; only those should be GC'ed.
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client1),
#{?snk_kind := terminate},
1_000
)
),
ct:pal("disconnected client1"),
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client2),
#{?snk_kind := terminate},
1_000
)
),
ct:pal("disconnected client2"),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
?block_until( ?block_until(
#{ #{
?snk_kind := ds_session_gc_cleaned, ?snk_kind := ds_session_gc,
session_id := ClientId1 ?snk_span := {complete, _},
} ?snk_meta := #{node := N}
} when N =/= node()
) )
), ),
?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
%% Now we disconnect 2 of them; only those should be GC'ed.
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client2),
#{?snk_kind := terminate}
)
),
?tp(notice, "disconnected client1", #{}),
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client3),
#{?snk_kind := terminate}
)
),
?tp(notice, "disconnected client2", #{}),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
?block_until( ?block_until(
@ -625,7 +586,16 @@ t_session_gc(Config) ->
} }
) )
), ),
?assertMatch([_], list_all_sessions(Node1), sessions), ?assertMatch(
{ok, _},
?block_until(
#{
?snk_kind := ds_session_gc_cleaned,
session_id := ClientId3
}
)
),
?assertMatch([ClientId1], list_all_sessions(Node1), sessions),
?assertMatch([_], list_all_subscriptions(Node1), subscriptions), ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
ok ok
end, end,

View File

@ -79,7 +79,7 @@
do_ensure_all_iterators_closed/1 do_ensure_all_iterators_closed/1
]). ]).
-export([print_session/1]). -export([print_session/1, seqno_diff/4]).
-ifdef(TEST). -ifdef(TEST).
-export([ -export([
@ -152,8 +152,7 @@
inflight_cnt, inflight_cnt,
inflight_max, inflight_max,
mqueue_len, mqueue_len,
mqueue_dropped, mqueue_dropped
awaiting_rel_cnt
]). ]).
%% %%
@ -233,8 +232,8 @@ info(mqueue_dropped, _Session) ->
%% PacketId; %% PacketId;
% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> % info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) ->
% AwaitingRel; % AwaitingRel;
info(awaiting_rel_cnt, #{s := S}) -> %% info(awaiting_rel_cnt, #{s := S}) ->
seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); %% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
info(awaiting_rel_max, #{props := Conf}) -> info(awaiting_rel_max, #{props := Conf}) ->
maps:get(max_awaiting_rel, Conf); maps:get(max_awaiting_rel, Conf);
info(await_rel_timeout, #{props := Conf}) -> info(await_rel_timeout, #{props := Conf}) ->
@ -271,6 +270,8 @@ subscribe(
) -> ) ->
case subs_lookup(TopicFilter, S0) of case subs_lookup(TopicFilter, S0) of
undefined -> undefined ->
%% TODO: max subscriptions
%% N.B.: we chose to update the router before adding the %% N.B.: we chose to update the router before adding the
%% subscription to the session/iterator table. The %% subscription to the session/iterator table. The
%% reasoning for this is as follows: %% reasoning for this is as follows:
@ -511,16 +512,21 @@ replay_batch(Srs0, Session, ClientInfo) ->
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{s := S0}, ConnInfo) -> disconnect(Session = #{s := S0}, ConnInfo) ->
OldConnInfo = emqx_persistent_session_ds_state:get_conninfo(S0),
NewConnInfo = maps:merge(OldConnInfo, maps:with([expiry_interval], ConnInfo)),
S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0),
S2 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S1), S2 =
case ConnInfo of
#{expiry_interval := EI} when is_number(EI) ->
emqx_persistent_session_ds_state:set_expiry_interval(EI, S1);
_ ->
S1
end,
S = emqx_persistent_session_ds_state:commit(S2), S = emqx_persistent_session_ds_state:commit(S2),
{shutdown, Session#{s => S}}. {shutdown, Session#{s => S}}.
-spec terminate(Reason :: term(), session()) -> ok. -spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, _Session = #{s := S}) -> terminate(_Reason, _Session = #{id := Id, s := S}) ->
_ = emqx_persistent_session_ds_state:commit(S), _ = emqx_persistent_session_ds_state:commit(S),
?tp(debug, persistent_session_ds_terminate, #{id => Id}),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -558,7 +564,7 @@ session_open(SessionId, NewConnInfo) ->
NowMS = now_ms(), NowMS = now_ms(),
case emqx_persistent_session_ds_state:open(SessionId) of case emqx_persistent_session_ds_state:open(SessionId) of
{ok, S0} -> {ok, S0} ->
EI = expiry_interval(emqx_persistent_session_ds_state:get_conninfo(S0)), EI = emqx_persistent_session_ds_state:get_expiry_interval(S0),
LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0), LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0),
case NowMS >= LastAliveAt + EI of case NowMS >= LastAliveAt + EI of
true -> true ->
@ -567,7 +573,7 @@ session_open(SessionId, NewConnInfo) ->
false -> false ->
?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}), ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}),
%% New connection being established %% New connection being established
S1 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S0), S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1), S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
S = emqx_persistent_session_ds_state:commit(S2), S = emqx_persistent_session_ds_state:commit(S2),
Inflight = emqx_persistent_session_ds_inflight:new( Inflight = emqx_persistent_session_ds_inflight:new(
@ -587,9 +593,10 @@ session_open(SessionId, NewConnInfo) ->
-spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) -> -spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) ->
session(). session().
session_ensure_new(Id, ConnInfo, Conf) -> session_ensure_new(Id, ConnInfo, Conf) ->
?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
Now = now_ms(), Now = now_ms(),
S0 = emqx_persistent_session_ds_state:create_new(Id), S0 = emqx_persistent_session_ds_state:create_new(Id),
S1 = emqx_persistent_session_ds_state:set_conninfo(ConnInfo, S0), S1 = emqx_persistent_session_ds_state:set_expiry_interval(expiry_interval(ConnInfo), S0),
S2 = bump_last_alive(S1), S2 = bump_last_alive(S1),
S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2), S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2),
S4 = lists:foldl( S4 = lists:foldl(
@ -970,8 +977,7 @@ inc_seqno(Qos, SeqNo) ->
NextSeqno NextSeqno
end. end.
%% Note: we use the least significant bit to store the QoS. Even %% Note: we use the most significant bit to store the QoS.
%% packet IDs are QoS1, odd packet IDs are QoS2.
seqno_to_packet_id(?QOS_1, SeqNo) -> seqno_to_packet_id(?QOS_1, SeqNo) ->
SeqNo band ?PACKET_ID_MASK; SeqNo band ?PACKET_ID_MASK;
seqno_to_packet_id(?QOS_2, SeqNo) -> seqno_to_packet_id(?QOS_2, SeqNo) ->

View File

@ -65,4 +65,11 @@
last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno() last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno()
}). }).
%% Session metadata keys:
-define(created_at, created_at).
-define(last_alive_at, last_alive_at).
-define(expiry_interval, expiry_interval).
%% Unique integer used to create unique identities
-define(last_id, last_id).
-endif. -endif.

View File

@ -69,7 +69,7 @@ handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
%% Internal fns %% Internal functions
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
ensure_gc_timer() -> ensure_gc_timer() ->
@ -116,22 +116,21 @@ gc_loop(MinLastAlive, It0) ->
{[], _It} -> {[], _It} ->
ok; ok;
{Sessions, It} -> {Sessions, It} ->
[ [do_gc(SessionId, MinLastAlive, Metadata) || {SessionId, Metadata} <- Sessions],
do_gc(SessionId, MinLastAlive, LastAliveAt, EI)
|| {SessionId, #{last_alive_at := LastAliveAt, conninfo := #{expiry_interval := EI}}} <-
Sessions
],
gc_loop(MinLastAlive, It) gc_loop(MinLastAlive, It)
end. end.
do_gc(SessionId, MinLastAlive, LastAliveAt, EI) when LastAliveAt + EI < MinLastAlive -> do_gc(SessionId, MinLastAlive, Metadata) ->
#{?last_alive_at := LastAliveAt, ?expiry_interval := EI} = Metadata,
case LastAliveAt + EI < MinLastAlive of
true ->
emqx_persistent_session_ds:destroy_session(SessionId), emqx_persistent_session_ds:destroy_session(SessionId),
?tp(debug, ds_session_gc_cleaned, #{ ?tp(debug, ds_session_gc_cleaned, #{
session_id => SessionId, session_id => SessionId,
last_alive_at => LastAliveAt, last_alive_at => LastAliveAt,
expiry_interval => EI, expiry_interval => EI,
min_last_alive => MinLastAlive min_last_alive => MinLastAlive
}), });
ok; false ->
do_gc(_SessionId, _MinLastAliveAt, _LastAliveAt, _EI) -> ok
ok. end.

View File

@ -29,7 +29,7 @@
-export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]). -export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]).
-export([get_created_at/1, set_created_at/2]). -export([get_created_at/1, set_created_at/2]).
-export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]).
-export([get_conninfo/1, set_conninfo/2]). -export([get_expiry_interval/1, set_expiry_interval/2]).
-export([new_id/1]). -export([new_id/1]).
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
-export([get_seqno/2, put_seqno/3]). -export([get_seqno/2, put_seqno/3]).
@ -81,18 +81,11 @@
dirty :: #{K => dirty | del} dirty :: #{K => dirty | del}
}. }.
%% Session metadata:
-define(created_at, created_at).
-define(last_alive_at, last_alive_at).
-define(conninfo, conninfo).
%% Unique integer used to create unique identities
-define(last_id, last_id).
-type metadata() :: -type metadata() ::
#{ #{
?created_at => emqx_persistent_session_ds:timestamp(), ?created_at => emqx_persistent_session_ds:timestamp(),
?last_alive_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(),
?conninfo => emqx_types:conninfo(), ?expiry_interval => emqx_types:conninfo(),
?last_id => integer() ?last_id => integer()
}. }.
@ -122,6 +115,7 @@
-define(rank_tab, emqx_ds_session_ranks). -define(rank_tab, emqx_ds_session_ranks).
-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). -define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]).
%% Enable this flag if you suspect some code breaks the sequence:
-ifndef(CHECK_SEQNO). -ifndef(CHECK_SEQNO).
-define(set_dirty, dirty => true). -define(set_dirty, dirty => true).
-define(unset_dirty, dirty => false). -define(unset_dirty, dirty => false).
@ -268,13 +262,13 @@ get_last_alive_at(Rec) ->
set_last_alive_at(Val, Rec) -> set_last_alive_at(Val, Rec) ->
set_meta(?last_alive_at, Val, Rec). set_meta(?last_alive_at, Val, Rec).
-spec get_conninfo(t()) -> emqx_types:conninfo() | undefined. -spec get_expiry_interval(t()) -> non_neg_integer() | undefined.
get_conninfo(Rec) -> get_expiry_interval(Rec) ->
get_meta(?conninfo, Rec). get_meta(?expiry_interval, Rec).
-spec set_conninfo(emqx_types:conninfo(), t()) -> t(). -spec set_expiry_interval(non_neg_integer(), t()) -> t().
set_conninfo(Val, Rec) -> set_expiry_interval(Val, Rec) ->
set_meta(?conninfo, Val, Rec). set_meta(?expiry_interval, Val, Rec).
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
new_id(Rec) -> new_id(Rec) ->

View File

@ -201,12 +201,10 @@ remove_fully_replayed_streams(S0) ->
Groups = emqx_persistent_session_ds_state:fold_streams( Groups = emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) -> fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) ->
Key = {SubId, RankX}, Key = {SubId, RankX},
case case is_fully_replayed(CommQos1, CommQos2, StreamState) of
{maps:get(Key, Acc, undefined), is_fully_replayed(CommQos1, CommQos2, StreamState)} true when is_map_key(Key, Acc) ->
of
{undefined, true} ->
Acc#{Key => {true, RankY}}; Acc#{Key => {true, RankY}};
{_, false} -> true ->
Acc#{Key => false}; Acc#{Key => false};
_ -> _ ->
Acc Acc

View File

@ -74,6 +74,7 @@ init_per_group(persistence_enabled, Config) ->
{emqx_config, {emqx_config,
"session_persistence {\n" "session_persistence {\n"
" enable = true\n" " enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n" " renew_streams_interval = 100ms\n"
"}"}, "}"},
{persistence, ds} {persistence, ds}
@ -534,6 +535,8 @@ t_process_dies_session_expires(Config) ->
%% Emulate an error in the connect process, %% Emulate an error in the connect process,
%% or that the node of the process goes down. %% or that the node of the process goes down.
%% A persistent session should eventually expire. %% A persistent session should eventually expire.
?check_trace(
begin
ConnFun = ?config(conn_fun, Config), ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config), ClientId = ?config(client_id, Config),
Topic = ?config(topic, Config), Topic = ?config(topic, Config),
@ -554,7 +557,7 @@ t_process_dies_session_expires(Config) ->
ok = publish(Topic, Payload), ok = publish(Topic, Payload),
timer:sleep(2000), timer:sleep(1500),
{ok, Client2} = emqtt:start_link([ {ok, Client2} = emqtt:start_link([
{proto_ver, v5}, {proto_ver, v5},
@ -569,7 +572,10 @@ t_process_dies_session_expires(Config) ->
%% We should not receive the pending message %% We should not receive the pending message
?assertEqual([], receive_messages(1)), ?assertEqual([], receive_messages(1)),
emqtt:disconnect(Client2). emqtt:disconnect(Client2)
end,
[]
).
t_publish_while_client_is_gone_qos1(Config) -> t_publish_while_client_is_gone_qos1(Config) ->
%% A persistent session should receive messages in its %% A persistent session should receive messages in its
@ -644,7 +650,7 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
#mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1}, #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1},
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1}, #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1},
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1}, #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M7">>, qos = 1} #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1}
], ],
ok = publish_many(Pubs1), ok = publish_many(Pubs1),
NPubs1 = length(Pubs1), NPubs1 = length(Pubs1),

View File

@ -4,4 +4,4 @@ Reduce RAM usage and frequency of database requests.
- Introduce dirty session state to avoid frequent mria transactions - Introduce dirty session state to avoid frequent mria transactions
- Introduce an intermediate buffer for the persistent messages - Introduce an intermediate buffer for the persistent messages
- Use separate tracks of PacketIds for QoS1 and QoS2 messages - Use separate tracks of PacketIds for QoS1 and QoS2 messages
- Limit the number of continuous ranges of infligtht messages to one per stream - Limit the number of continuous ranges of inflight messages to one per stream