diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/emqx_persistent_session.erl index 13c74b62e..c5fab4170 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/emqx_persistent_session.erl @@ -18,6 +18,7 @@ -export([ is_store_enabled/0 , init_db_backend/0 + , storage_type/0 ]). -export([ discard/2 @@ -83,8 +84,15 @@ init_db_backend() -> case is_store_enabled() of true -> ok = emqx_trie:create_session_trie(), - emqx_persistent_session_mnesia_backend:create_tables(), - persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend), + ok = emqx_session_router:create_router_tab(), + case storage_type() of + disc -> + emqx_persistent_session_mnesia_disc_backend:create_tables(), + persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_disc_backend); + ram -> + emqx_persistent_session_mnesia_ram_backend:create_tables(), + persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend) + end, ok; false -> persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend), @@ -94,6 +102,9 @@ init_db_backend() -> is_store_enabled() -> emqx_config:get(?is_enabled_key). +storage_type() -> + emqx_config:get(?storage_type_key). + %%-------------------------------------------------------------------- %% Session message ADT API %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session.hrl b/apps/emqx/src/emqx_persistent_session.hrl index 4cb51160a..e973a8f2b 100644 --- a/apps/emqx/src/emqx_persistent_session.hrl +++ b/apps/emqx/src/emqx_persistent_session.hrl @@ -14,9 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- --define(SESSION_STORE, emqx_session_store). --define(SESS_MSG_TAB, emqx_session_msg). --define(MSG_TAB, emqx_persistent_msg). +-define(SESSION_STORE_DISC, emqx_session_store_disc). +-define(SESSION_STORE_RAM, emqx_session_store_ram). + +-define(SESS_MSG_TAB_DISC, emqx_session_msg_disc). +-define(SESS_MSG_TAB_RAM, emqx_session_msg_ram). + +-define(MSG_TAB_DISC, emqx_persistent_msg_disc). +-define(MSG_TAB_RAM, emqx_persistent_msg_ram). -record(session_store, { client_id :: binary() , expiry_interval :: non_neg_integer() @@ -28,6 +33,7 @@ -define(db_backend_key, [persistent_session_store, db_backend]). -define(is_enabled_key, [persistent_session_store, enabled]). +-define(storage_type_key, [persistent_session_store, storage_type]). -define(msg_retain, [persistent_session_store, max_retain_undelivered]). -define(db_backend, (persistent_term:get(?db_backend_key))). diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl similarity index 79% rename from apps/emqx/src/emqx_persistent_session_mnesia_backend.erl rename to apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl index 512984845..d10649308 100644 --- a/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl +++ b/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_persistent_session_mnesia_backend). +-module(emqx_persistent_session_mnesia_disc_backend). -include("emqx.hrl"). -include("emqx_persistent_session.hrl"). @@ -36,7 +36,7 @@ ]). create_tables() -> - ok = mria:create_table(?SESSION_STORE, [ + ok = mria:create_table(?SESSION_STORE_DISC, [ {type, set}, {rlog_shard, ?PERSISTENT_SESSION_SHARD}, {storage, disc_copies}, @@ -44,7 +44,7 @@ create_tables() -> {attributes, record_info(fields, session_store)}, {storage_properties, [{ets, [{read_concurrency, true}]}]}]), - ok = mria:create_table(?SESS_MSG_TAB, [ + ok = mria:create_table(?SESS_MSG_TAB_DISC, [ {type, ordered_set}, {rlog_shard, ?PERSISTENT_SESSION_SHARD}, {storage, disc_copies}, @@ -53,7 +53,7 @@ create_tables() -> {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]), - ok = mria:create_table(?MSG_TAB, [ + ok = mria:create_table(?MSG_TAB_DISC, [ {type, ordered_set}, {rlog_shard, ?PERSISTENT_SESSION_SHARD}, {storage, disc_copies}, @@ -63,43 +63,43 @@ create_tables() -> {write_concurrency, true}]}]}]). first_session_message() -> - mnesia:dirty_first(?SESS_MSG_TAB). + mnesia:dirty_first(?SESS_MSG_TAB_DISC). next_session_message(Key) -> - mnesia:dirty_next(?SESS_MSG_TAB, Key). + mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key). first_message_id() -> - mnesia:dirty_first(?MSG_TAB). + mnesia:dirty_first(?MSG_TAB_DISC). next_message_id(Key) -> - mnesia:dirty_next(?MSG_TAB, Key). + mnesia:dirty_next(?MSG_TAB_DISC, Key). delete_message(Key) -> - mria:dirty_delete(?MSG_TAB, Key). + mria:dirty_delete(?MSG_TAB_DISC, Key). delete_session_message(Key) -> - mria:dirty_delete(?SESS_MSG_TAB, Key). + mria:dirty_delete(?SESS_MSG_TAB_DISC, Key). put_session_store(SS) -> - mria:dirty_write(?SESSION_STORE, SS). + mria:dirty_write(?SESSION_STORE_DISC, SS). delete_session_store(ClientID) -> - mria:dirty_delete(?SESSION_STORE, ClientID). + mria:dirty_delete(?SESSION_STORE_DISC, ClientID). lookup_session_store(ClientID) -> - case mnesia:dirty_read(?SESSION_STORE, ClientID) of + case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of [] -> none; [SS] -> {value, SS} end. put_session_message(SessMsg) -> - mria:dirty_write(?SESS_MSG_TAB, SessMsg). + mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg). put_message(Msg) -> - mria:dirty_write(?MSG_TAB, Msg). + mria:dirty_write(?MSG_TAB_DISC, Msg). get_message(MsgId) -> - case mnesia:read(?MSG_TAB, MsgId) of + case mnesia:read(?MSG_TAB_DISC, MsgId) of [] -> error({msg_not_found, MsgId}); [Msg] -> Msg end. diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl new file mode 100644 index 000000000..f13d92a51 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl @@ -0,0 +1,110 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_mnesia_ram_backend). + +-include("emqx.hrl"). +-include("emqx_persistent_session.hrl"). + +-export([ create_tables/0 + , first_message_id/0 + , next_message_id/1 + , delete_message/1 + , first_session_message/0 + , next_session_message/1 + , delete_session_message/1 + , put_session_store/1 + , delete_session_store/1 + , lookup_session_store/1 + , put_session_message/1 + , put_message/1 + , get_message/1 + , ro_transaction/1 + ]). + +create_tables() -> + ok = mria:create_table(?SESSION_STORE_RAM, [ + {type, set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, ram_copies}, + {record_name, session_store}, + {attributes, record_info(fields, session_store)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]}]), + + ok = mria:create_table(?SESS_MSG_TAB_RAM, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, ram_copies}, + {record_name, session_msg}, + {attributes, record_info(fields, session_msg)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), + + ok = mria:create_table(?MSG_TAB_RAM, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, ram_copies}, + {record_name, message}, + {attributes, record_info(fields, message)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]). + +first_session_message() -> + mnesia:dirty_first(?SESS_MSG_TAB_RAM). + +next_session_message(Key) -> + mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key). + +first_message_id() -> + mnesia:dirty_first(?MSG_TAB_RAM). + +next_message_id(Key) -> + mnesia:dirty_next(?MSG_TAB_RAM, Key). + +delete_message(Key) -> + mria:dirty_delete(?MSG_TAB_RAM, Key). + +delete_session_message(Key) -> + mria:dirty_delete(?SESS_MSG_TAB_RAM, Key). + +put_session_store(SS) -> + mria:dirty_write(?SESSION_STORE_RAM, SS). + +delete_session_store(ClientID) -> + mria:dirty_delete(?SESSION_STORE_RAM, ClientID). + +lookup_session_store(ClientID) -> + case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of + [] -> none; + [SS] -> {value, SS} + end. + +put_session_message(SessMsg) -> + mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg). + +put_message(Msg) -> + mria:dirty_write(?MSG_TAB_RAM, Msg). + +get_message(MsgId) -> + case mnesia:read(?MSG_TAB_RAM, MsgId) of + [] -> error({msg_not_found, MsgId}); + [Msg] -> Msg + end. + +ro_transaction(Fun) -> + {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), + Res. + diff --git a/apps/emqx/src/emqx_router_utils.erl b/apps/emqx/src/emqx_router_utils.erl index c47f2e37b..3c7047306 100644 --- a/apps/emqx/src/emqx_router_utils.erl +++ b/apps/emqx/src/emqx_router_utils.erl @@ -20,8 +20,10 @@ -export([ delete_direct_route/2 , delete_trie_route/2 + , delete_session_trie_route/2 , insert_direct_route/2 , insert_trie_route/2 + , insert_session_trie_route/2 , maybe_trans/3 ]). @@ -30,8 +32,14 @@ insert_direct_route(Tab, Route) -> insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> case mnesia:wread({RouteTab, Topic}) of - [] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic); - [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic); + [] -> emqx_trie:insert(Topic); + _ -> ok + end, + mnesia:write(RouteTab, Route, sticky_write). + +insert_session_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [] -> emqx_trie:insert_session(Topic); _ -> ok end, mnesia:write(RouteTab, Route, sticky_write). @@ -39,14 +47,20 @@ insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> delete_direct_route(RouteTab, Route) -> mria:dirty_delete_object(RouteTab, Route). -delete_trie_route(RouteTab, Route = #route{topic = Topic}) -> +delete_trie_route(RouteTab, Route) -> + delete_trie_route(RouteTab, Route, normal). + +delete_session_trie_route(RouteTab, Route) -> + delete_trie_route(RouteTab, Route, session). + +delete_trie_route(RouteTab, Route = #route{topic = Topic}, Type) -> case mnesia:wread({RouteTab, Topic}) of [R] when R =:= Route -> %% Remove route and trie ok = mnesia:delete_object(RouteTab, Route, sticky_write), - case RouteTab of - emqx_route -> emqx_trie:delete(Topic); - emqx_session_route -> emqx_trie:delete_session(Topic) + case Type of + normal -> emqx_trie:delete(Topic); + session -> emqx_trie:delete_session(Topic) end; [_|_] -> %% Remove route only diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index b7f3c5690..4c96d30ca 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -163,6 +163,10 @@ fields("persistent_session_store") -> sc(boolean(), #{ default => "false" })}, + {"storage_type", + sc(hoconsc:union([ram, disc]), + #{ default => disc + })}, {"max_retain_undelivered", sc(duration(), #{ default => "1h" diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 45c7ecd85..03e8a9cb2 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -24,12 +24,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). - -export([ create_init_tab/0 + , create_router_tab/0 , start_link/2]). %% Route APIs @@ -60,7 +56,8 @@ -type(dest() :: node() | {group(), node()}). --define(ROUTE_TAB, emqx_session_route). +-define(ROUTE_RAM_TAB, emqx_session_route_ram). +-define(ROUTE_DISC_TAB, emqx_session_route_disc). -define(SESSION_INIT_TAB, session_init_tab). @@ -68,13 +65,21 @@ %% Mnesia bootstrap %%-------------------------------------------------------------------- -mnesia(boot) -> - ok = mria:create_table(?ROUTE_TAB, [ +create_router_tab() -> + ok = mria:create_table(?ROUTE_DISC_TAB, [ {type, bag}, {rlog_shard, ?ROUTE_SHARD}, {storage, disc_copies}, {record_name, route}, {attributes, record_info(fields, route)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), + ok = mria:create_table(?ROUTE_RAM_TAB, [ + {type, bag}, + {rlog_shard, ?ROUTE_SHARD}, + {storage, ram_copies}, + {record_name, route}, + {attributes, record_info(fields, route)}, {storage_properties, [{ets, [{read_concurrency, true}, {write_concurrency, true}]}]}]). @@ -103,11 +108,11 @@ do_add_route(Topic, SessionID) when is_binary(Topic) -> false -> case emqx_topic:wildcard(Topic) of true -> - Fun = fun emqx_router_utils:insert_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], + Fun = fun emqx_router_utils:insert_session_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD); false -> - emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) + emqx_router_utils:insert_direct_route(route_tab(), Route) end end. @@ -136,10 +141,10 @@ do_delete_route(Topic, SessionID) -> Route = #route{topic = Topic, dest = SessionID}, case emqx_topic:wildcard(Topic) of true -> - Fun = fun emqx_router_utils:delete_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD); + Fun = fun emqx_router_utils:delete_session_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD); false -> - emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) + emqx_router_utils:delete_direct_route(route_tab(), Route) end. %% @doc Print routes to a topic @@ -273,4 +278,10 @@ init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) -> %%-------------------------------------------------------------------- lookup_routes(Topic) -> - ets:lookup(?ROUTE_TAB, Topic). + ets:lookup(route_tab(), Topic). + +route_tab() -> + case emqx_persistent_session:storage_type() of + disc -> ?ROUTE_DISC_TAB; + ram -> ?ROUTE_RAM_TAB + end. diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 1881ecb6b..995515d9c 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -48,7 +48,8 @@ -endif. -define(TRIE, emqx_trie). --define(SESSION_TRIE, emqx_session_trie). +-define(SESSION_DISC_TRIE, emqx_session_trie_disc). +-define(SESSION_RAM_TRIE, emqx_session_trie_ram). -define(PREFIX(Prefix), {Prefix, 0}). -define(TOPIC(Topic), {Topic, 1}). @@ -79,12 +80,19 @@ create_session_trie() -> StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true} ]}], - ok = mria:create_table(?SESSION_TRIE, + ok = mria:create_table(?SESSION_DISC_TRIE, [{rlog_shard, ?ROUTE_SHARD}, {storage, disc_copies}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, + {storage_properties, StoreProps}]), + ok = mria:create_table(?SESSION_RAM_TRIE, + [{rlog_shard, ?ROUTE_SHARD}, + {storage, ram_copies}, + {record_name, ?TRIE}, + {attributes, record_info(fields, ?TRIE)}, + {type, ordered_set}, {storage_properties, StoreProps}]). %%-------------------------------------------------------------------- @@ -98,7 +106,7 @@ insert(Topic) when is_binary(Topic) -> -spec(insert_session(emqx_topic:topic()) -> ok). insert_session(Topic) when is_binary(Topic) -> - insert(Topic, ?SESSION_TRIE). + insert(Topic, session_trie()). insert(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), @@ -115,7 +123,7 @@ delete(Topic) when is_binary(Topic) -> %% @doc Delete a topic filter from the trie. -spec(delete_session(emqx_topic:topic()) -> ok). delete_session(Topic) when is_binary(Topic) -> - delete(Topic, ?SESSION_TRIE). + delete(Topic, session_trie()). delete(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), @@ -131,7 +139,7 @@ match(Topic) when is_binary(Topic) -> -spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())). match_session(Topic) when is_binary(Topic) -> - match(Topic, ?SESSION_TRIE). + match(Topic, session_trie()). match(Topic, Trie) when is_binary(Topic) -> Words = emqx_topic:words(Topic), @@ -154,7 +162,7 @@ match(Topic, Trie) when is_binary(Topic) -> empty() -> empty(?TRIE). empty_session() -> - empty(?SESSION_TRIE). + empty(session_trie()). empty(Trie) -> ets:first(Trie) =:= '$end_of_table'. @@ -164,12 +172,19 @@ lock_tables() -> -spec lock_session_tables() -> ok. lock_session_tables() -> - mnesia:write_lock_table(?SESSION_TRIE). + mnesia:write_lock_table(session_trie()). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +session_trie() -> + case emqx_persistent_session:storage_type() of + disc -> ?SESSION_DISC_TRIE; + ram -> ?SESSION_RAM_TRIE + end. + + make_keys(Topic) -> Words = emqx_topic:words(Topic), {?TOPIC(Topic), [?PREFIX(Prefix) || Prefix <- make_prefixes(Words)]}. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index d13d3ed8b..3233227ce 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -50,13 +50,17 @@ groups() -> SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)], GCTests = [TC || TC <- TCs, is_gc_tc(TC)], OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests, - [ {persistent_store_enabled, [ {group, no_kill_connection_process} - , {group, kill_connection_process} - , {group, snabbkaffe} - , {group, gc_tests} + [ {persistent_store_enabled, [ {group, ram_tables} + , {group, disc_tables} ]} , {persistent_store_disabled, [ {group, no_kill_connection_process} ]} + , { ram_tables, [], [ {group, no_kill_connection_process} + , {group, snabbkaffe} + , {group, gc_tests}]} + , { disc_tables, [], [ {group, no_kill_connection_process} + , {group, snabbkaffe} + , {group, gc_tests}]} , {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]} , { kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]} , {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}]} @@ -76,15 +80,23 @@ is_gc_tc(TC) -> re:run(atom_to_list(TC), "^t_gc_") /= nomatch. init_per_group(persistent_store_enabled, Config) -> + [{persistent_store_enabled, true}|Config]; +init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables -> %% Start Apps + Reply = case Group =:= ram_tables of + true -> ram; + false -> disc + end, emqx_common_test_helpers:boot_modules(all), meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_config, get, fun(?is_enabled_key) -> true; - (Other) -> meck:passthrough([Other]) + meck:expect(emqx_config, get, fun(?storage_type_key) -> Reply; + (?is_enabled_key) -> true; + (Other) -> meck:passthrough([Other]) end), emqx_common_test_helpers:start_apps([], fun set_special_confs/1), ?assertEqual(true, emqx_persistent_session:is_store_enabled()), - [{persistent_store_enabled, true}|Config]; + ?assertEqual(Reply, emqx_persistent_session:storage_type()), + Config; init_per_group(persistent_store_disabled, Config) -> %% Start Apps emqx_common_test_helpers:boot_modules(all), @@ -125,15 +137,20 @@ init_per_group(gc_tests, Config) -> receive stop -> ok end end), meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), - meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts); - (?MSG_TAB) -> ets:first(MsgEts); - (X) -> meck:passthrough(X) + meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts); + (?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts); + (?MSG_TAB_RAM) -> ets:first(MsgEts); + (?MSG_TAB_DISC) -> ets:first(MsgEts); + (X) -> meck:passthrough([X]) end), - meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X); - (?MSG_TAB, X) -> ets:next(MsgEts, X); + meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X); + (?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X); + (?MSG_TAB_RAM, X) -> ets:next(MsgEts, X); + (?MSG_TAB_DISC, X) -> ets:next(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), - meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X); + meck:expect(mnesia, dirty_delete, fun(?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X); + (?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config]. @@ -154,7 +171,7 @@ end_per_group(gc_tests, Config) -> meck:unload(mnesia), ?config(store_owner, Config) ! stop, ok; -end_per_group(persistent_store_enabled, _Config) -> +end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables -> meck:unload(emqx_config), emqx_common_test_helpers:stop_apps([]); end_per_group(persistent_store_disabled, _Config) ->