diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 27519678d..6e03a1c32 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -120,7 +120,7 @@ -define(rank_tab, emqx_ds_session_ranks). -define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). --ifndef(TEST). +-ifndef(CHECK_SEQNO). -define(set_dirty, dirty => true). -define(unset_dirty, dirty => false). -else. @@ -562,7 +562,7 @@ ro_transaction(Fun) -> -compile({inline, check_sequence/1}). --ifdef(TEST). +-ifdef(CHECK_SEQNO). do_seqno() -> case erlang:get(?MODULE) of undefined -> diff --git a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl new file mode 100644 index 000000000..35554829a --- /dev/null +++ b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl @@ -0,0 +1,372 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_persistent_session_ds_state_tests). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(tab, ?MODULE). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}). + +-type state() :: #{emqx_persistent_session_ds:id() => #s{}}. + +%%================================================================================ +%% Properties +%%================================================================================ + +seqno_proper_test_() -> + Props = [prop_consistency()], + Opts = [{numtests, 10}, {to_file, user}, {max_size, 100}], + {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}. + +prop_consistency() -> + ?FORALL( + Cmds, + commands(?MODULE), + ?TRAPEXIT( + begin + init(), + {_History, State, Result} = run_commands(?MODULE, Cmds), + clean(), + ?WHENFAIL( + io:format( + user, + "Operations: ~p~nState: ~p\nResult: ~p~n", + [Cmds, State, Result] + ), + aggregate(command_names(Cmds), Result =:= ok) + ) + end + ) + ). + +%%================================================================================ +%% Generators +%%================================================================================ + +-define(n_sessions, 10). + +session_id() -> + oneof([integer_to_binary(I) || I <- lists:seq(1, ?n_sessions)]). + +topic() -> + oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]). + +subid() -> + oneof([[]]). + +subscription() -> + oneof([#{}]). + +session_id(S) -> + oneof(maps:keys(S)). + +batch_size() -> + range(1, ?n_sessions). + +put_metadata() -> + oneof([ + ?LET( + Val, + range(0, 100), + {last_alive_at, set_last_alive_at, Val} + ), + ?LET( + Val, + range(0, 100), + {created_at, set_created_at, Val} + ) + ]). + +get_metadata() -> + oneof([ + {last_alive_at, get_last_alive_at}, + {created_at, get_created_at} + ]). + +seqno_track() -> + range(0, 1). + +seqno() -> + range(1, 100). + +stream_id() -> + range(1, 1). + +stream() -> + oneof([#{}]). + +put_req() -> + oneof([ + ?LET( + {Id, Stream}, + {stream_id(), stream()}, + {#s.streams, put_stream, Id, Stream} + ), + ?LET( + {Track, Seqno}, + {seqno_track(), seqno()}, + {#s.seqno, put_seqno, Track, Seqno} + ) + ]). + +get_req() -> + oneof([ + {#s.streams, get_stream, stream_id()}, + {#s.seqno, get_seqno, seqno_track()} + ]). + +del_req() -> + oneof([ + {#s.streams, del_stream, stream_id()} + ]). + +command(S) -> + case maps:size(S) > 0 of + true -> + frequency([ + %% Global CRUD operations: + {1, {call, ?MODULE, create_new, [session_id()]}}, + {1, {call, ?MODULE, delete, [session_id(S)]}}, + {2, {call, ?MODULE, reopen, [session_id(S)]}}, + {2, {call, ?MODULE, commit, [session_id(S)]}}, + + %% Subscriptions: + {3, + {call, ?MODULE, put_subscription, [ + session_id(S), topic(), subid(), subscription() + ]}}, + {3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}}, + + %% Metadata: + {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}}, + {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}}, + + %% Key-value: + {3, {call, ?MODULE, gen_put, [session_id(S), put_req()]}}, + {3, {call, ?MODULE, gen_get, [session_id(S), get_req()]}}, + {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}}, + + %% Getters: + {4, {call, ?MODULE, get_subscriptions, [session_id(S)]}}, + {1, {call, ?MODULE, iterate_sessions, [batch_size()]}} + ]); + false -> + frequency([ + {1, {call, ?MODULE, create_new, [session_id()]}}, + {1, {call, ?MODULE, iterate_sessions, [batch_size()]}} + ]) + end. + +precondition(_, _) -> + true. + +postcondition(S, {call, ?MODULE, iterate_sessions, [_]}, Result) -> + {Sessions, _} = lists:unzip(Result), + %% No lingering sessions: + ?assertMatch([], Sessions -- maps:keys(S)), + %% All committed sessions are visited by the iterator: + CommittedSessions = lists:sort([K || {K, #s{committed = true}} <- maps:to_list(S)]), + ?assertMatch([], CommittedSessions -- Sessions), + true; +postcondition(S, {call, ?MODULE, get_metadata, [SessionId, {MetaKey, _Fun}]}, Result) -> + #{SessionId := #s{metadata = Meta}} = S, + ?assertEqual( + maps:get(MetaKey, Meta, undefined), + Result, + #{session_id => SessionId, meta => MetaKey} + ), + true; +postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) -> + #{SessionId := Record} = S, + ?assertEqual( + maps:get(Key, element(Idx, Record), undefined), + Result, + #{session_id => SessionId, key => Key, 'fun' => Fun} + ), + true; +postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) -> + #{SessionId := #s{subs = Subs}} = S, + ?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)), + maps:foreach( + fun({TopicFilter, Id}, Expected) -> + ?assertEqual( + Expected, + emqx_topic_gbt:lookup(TopicFilter, Id, Result, default) + ) + end, + Subs + ), + true; +postcondition(_, _, _) -> + true. + +next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) -> + S#{SessionId => #s{}}; +next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) -> + maps:remove(SessionId, S); +next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) -> + Key = {TopicFilter, SubId}, + update( + SessionId, + #s.subs, + fun(Subs) -> Subs#{Key => Subscription} end, + S + ); +next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) -> + Key = {TopicFilter, SubId}, + update( + SessionId, + #s.subs, + fun(Subs) -> maps:remove(Key, Subs) end, + S + ); +next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) -> + update( + SessionId, + #s.metadata, + fun(Map) -> Map#{Key => Val} end, + S + ); +next_state(S, _V, {call, ?MODULE, gen_put, [SessionId, {Idx, _Fun, Key, Val}]}) -> + update( + SessionId, + Idx, + fun(Map) -> Map#{Key => Val} end, + S + ); +next_state(S, _V, {call, ?MODULE, gen_del, [SessionId, {Idx, _Fun, Key}]}) -> + update( + SessionId, + Idx, + fun(Map) -> maps:remove(Key, Map) end, + S + ); +next_state(S, _V, {call, ?MODULE, commit, [SessionId]}) -> + update( + SessionId, + #s.committed, + fun(_) -> true end, + S + ); +next_state(S, _V, {call, ?MODULE, _, _}) -> + S. + +initial_state() -> + #{}. + +%%================================================================================ +%% Operations +%%================================================================================ + +create_new(SessionId) -> + put_state(SessionId, emqx_persistent_session_ds_state:create_new(SessionId)). + +delete(SessionId) -> + emqx_persistent_session_ds_state:delete(SessionId), + ets:delete(?tab, SessionId). + +commit(SessionId) -> + put_state(SessionId, emqx_persistent_session_ds_state:commit(get_state(SessionId))). + +reopen(SessionId) -> + _ = emqx_persistent_session_ds_state:commit(get_state(SessionId)), + {ok, S} = emqx_persistent_session_ds_state:open(SessionId), + put_state(SessionId, S). + +put_subscription(SessionId, TopicFilter, SubId, Subscription) -> + S = emqx_persistent_session_ds_state:put_subscription( + TopicFilter, SubId, Subscription, get_state(SessionId) + ), + put_state(SessionId, S). + +del_subscription(SessionId, TopicFilter, SubId) -> + S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)), + put_state(SessionId, S). + +get_subscriptions(SessionId) -> + emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)). + +put_metadata(SessionId, {_MetaKey, Fun, Value}) -> + S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]), + put_state(SessionId, S). + +get_metadata(SessionId, {_MetaKey, Fun}) -> + apply(emqx_persistent_session_ds_state, Fun, [get_state(SessionId)]). + +gen_put(SessionId, {_Idx, Fun, Key, Value}) -> + S = apply(emqx_persistent_session_ds_state, Fun, [Key, Value, get_state(SessionId)]), + put_state(SessionId, S). + +gen_del(SessionId, {_Idx, Fun, Key}) -> + S = apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]), + put_state(SessionId, S). + +gen_get(SessionId, {_Idx, Fun, Key}) -> + apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]). + +iterate_sessions(BatchSize) -> + Fun = fun F(It0) -> + case emqx_persistent_session_ds_state:session_iterator_next(It0, BatchSize) of + {[], _} -> + []; + {Sessions, It} -> + Sessions ++ F(It) + end + end, + Fun(emqx_persistent_session_ds_state:make_session_iterator()). + +%%================================================================================ +%% Misc. +%%================================================================================ + +update(SessionId, Key, Fun, S) -> + maps:update_with( + SessionId, + fun(SS) -> + setelement(Key, SS, Fun(erlang:element(Key, SS))) + end, + S + ). + +get_state(SessionId) -> + case ets:lookup(?tab, SessionId) of + [{_, S}] -> + S; + [] -> + error({not_found, SessionId}) + end. + +put_state(SessionId, S) -> + ets:insert(?tab, {SessionId, S}). + +init() -> + _ = ets:new(?tab, [named_table, public, {keypos, 1}]), + mria:start(), + emqx_persistent_session_ds_state:create_tables(). + +clean() -> + ets:delete(?tab), + mria:stop(), + mria_mnesia:delete_schema().