feat(persistent_sessions): add choice between ram or disc backends in mnesia

This commit is contained in:
Tobias Lindahl 2021-11-23 11:50:20 +01:00
parent ef0e440d27
commit 08acb5d435
9 changed files with 251 additions and 63 deletions

View File

@ -18,6 +18,7 @@
-export([ is_store_enabled/0
, init_db_backend/0
, storage_type/0
]).
-export([ discard/2
@ -83,8 +84,15 @@ init_db_backend() ->
case is_store_enabled() of
true ->
ok = emqx_trie:create_session_trie(),
emqx_persistent_session_mnesia_backend:create_tables(),
persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend),
ok = emqx_session_router:create_router_tab(),
case storage_type() of
disc ->
emqx_persistent_session_mnesia_disc_backend:create_tables(),
persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_disc_backend);
ram ->
emqx_persistent_session_mnesia_ram_backend:create_tables(),
persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend)
end,
ok;
false ->
persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend),
@ -94,6 +102,9 @@ init_db_backend() ->
is_store_enabled() ->
emqx_config:get(?is_enabled_key).
storage_type() ->
emqx_config:get(?storage_type_key).
%%--------------------------------------------------------------------
%% Session message ADT API
%%--------------------------------------------------------------------

View File

@ -14,9 +14,14 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-define(SESSION_STORE, emqx_session_store).
-define(SESS_MSG_TAB, emqx_session_msg).
-define(MSG_TAB, emqx_persistent_msg).
-define(SESSION_STORE_DISC, emqx_session_store_disc).
-define(SESSION_STORE_RAM, emqx_session_store_ram).
-define(SESS_MSG_TAB_DISC, emqx_session_msg_disc).
-define(SESS_MSG_TAB_RAM, emqx_session_msg_ram).
-define(MSG_TAB_DISC, emqx_persistent_msg_disc).
-define(MSG_TAB_RAM, emqx_persistent_msg_ram).
-record(session_store, { client_id :: binary()
, expiry_interval :: non_neg_integer()
@ -28,6 +33,7 @@
-define(db_backend_key, [persistent_session_store, db_backend]).
-define(is_enabled_key, [persistent_session_store, enabled]).
-define(storage_type_key, [persistent_session_store, storage_type]).
-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
-define(db_backend, (persistent_term:get(?db_backend_key))).

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_mnesia_backend).
-module(emqx_persistent_session_mnesia_disc_backend).
-include("emqx.hrl").
-include("emqx_persistent_session.hrl").
@ -36,7 +36,7 @@
]).
create_tables() ->
ok = mria:create_table(?SESSION_STORE, [
ok = mria:create_table(?SESSION_STORE_DISC, [
{type, set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
@ -44,7 +44,7 @@ create_tables() ->
{attributes, record_info(fields, session_store)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
ok = mria:create_table(?SESS_MSG_TAB, [
ok = mria:create_table(?SESS_MSG_TAB_DISC, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
@ -53,7 +53,7 @@ create_tables() ->
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = mria:create_table(?MSG_TAB, [
ok = mria:create_table(?MSG_TAB_DISC, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
@ -63,43 +63,43 @@ create_tables() ->
{write_concurrency, true}]}]}]).
first_session_message() ->
mnesia:dirty_first(?SESS_MSG_TAB).
mnesia:dirty_first(?SESS_MSG_TAB_DISC).
next_session_message(Key) ->
mnesia:dirty_next(?SESS_MSG_TAB, Key).
mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key).
first_message_id() ->
mnesia:dirty_first(?MSG_TAB).
mnesia:dirty_first(?MSG_TAB_DISC).
next_message_id(Key) ->
mnesia:dirty_next(?MSG_TAB, Key).
mnesia:dirty_next(?MSG_TAB_DISC, Key).
delete_message(Key) ->
mria:dirty_delete(?MSG_TAB, Key).
mria:dirty_delete(?MSG_TAB_DISC, Key).
delete_session_message(Key) ->
mria:dirty_delete(?SESS_MSG_TAB, Key).
mria:dirty_delete(?SESS_MSG_TAB_DISC, Key).
put_session_store(SS) ->
mria:dirty_write(?SESSION_STORE, SS).
mria:dirty_write(?SESSION_STORE_DISC, SS).
delete_session_store(ClientID) ->
mria:dirty_delete(?SESSION_STORE, ClientID).
mria:dirty_delete(?SESSION_STORE_DISC, ClientID).
lookup_session_store(ClientID) ->
case mnesia:dirty_read(?SESSION_STORE, ClientID) of
case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of
[] -> none;
[SS] -> {value, SS}
end.
put_session_message(SessMsg) ->
mria:dirty_write(?SESS_MSG_TAB, SessMsg).
mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg).
put_message(Msg) ->
mria:dirty_write(?MSG_TAB, Msg).
mria:dirty_write(?MSG_TAB_DISC, Msg).
get_message(MsgId) ->
case mnesia:read(?MSG_TAB, MsgId) of
case mnesia:read(?MSG_TAB_DISC, MsgId) of
[] -> error({msg_not_found, MsgId});
[Msg] -> Msg
end.

View File

@ -0,0 +1,110 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_mnesia_ram_backend).
-include("emqx.hrl").
-include("emqx_persistent_session.hrl").
-export([ create_tables/0
, first_message_id/0
, next_message_id/1
, delete_message/1
, first_session_message/0
, next_session_message/1
, delete_session_message/1
, put_session_store/1
, delete_session_store/1
, lookup_session_store/1
, put_session_message/1
, put_message/1
, get_message/1
, ro_transaction/1
]).
create_tables() ->
ok = mria:create_table(?SESSION_STORE_RAM, [
{type, set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, ram_copies},
{record_name, session_store},
{attributes, record_info(fields, session_store)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
ok = mria:create_table(?SESS_MSG_TAB_RAM, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, ram_copies},
{record_name, session_msg},
{attributes, record_info(fields, session_msg)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = mria:create_table(?MSG_TAB_RAM, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, ram_copies},
{record_name, message},
{attributes, record_info(fields, message)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]).
first_session_message() ->
mnesia:dirty_first(?SESS_MSG_TAB_RAM).
next_session_message(Key) ->
mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key).
first_message_id() ->
mnesia:dirty_first(?MSG_TAB_RAM).
next_message_id(Key) ->
mnesia:dirty_next(?MSG_TAB_RAM, Key).
delete_message(Key) ->
mria:dirty_delete(?MSG_TAB_RAM, Key).
delete_session_message(Key) ->
mria:dirty_delete(?SESS_MSG_TAB_RAM, Key).
put_session_store(SS) ->
mria:dirty_write(?SESSION_STORE_RAM, SS).
delete_session_store(ClientID) ->
mria:dirty_delete(?SESSION_STORE_RAM, ClientID).
lookup_session_store(ClientID) ->
case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of
[] -> none;
[SS] -> {value, SS}
end.
put_session_message(SessMsg) ->
mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg).
put_message(Msg) ->
mria:dirty_write(?MSG_TAB_RAM, Msg).
get_message(MsgId) ->
case mnesia:read(?MSG_TAB_RAM, MsgId) of
[] -> error({msg_not_found, MsgId});
[Msg] -> Msg
end.
ro_transaction(Fun) ->
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
Res.

View File

@ -20,8 +20,10 @@
-export([ delete_direct_route/2
, delete_trie_route/2
, delete_session_trie_route/2
, insert_direct_route/2
, insert_trie_route/2
, insert_session_trie_route/2
, maybe_trans/3
]).
@ -30,8 +32,14 @@ insert_direct_route(Tab, Route) ->
insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic);
[] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic);
[] -> emqx_trie:insert(Topic);
_ -> ok
end,
mnesia:write(RouteTab, Route, sticky_write).
insert_session_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[] -> emqx_trie:insert_session(Topic);
_ -> ok
end,
mnesia:write(RouteTab, Route, sticky_write).
@ -39,14 +47,20 @@ insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
delete_direct_route(RouteTab, Route) ->
mria:dirty_delete_object(RouteTab, Route).
delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
delete_trie_route(RouteTab, Route) ->
delete_trie_route(RouteTab, Route, normal).
delete_session_trie_route(RouteTab, Route) ->
delete_trie_route(RouteTab, Route, session).
delete_trie_route(RouteTab, Route = #route{topic = Topic}, Type) ->
case mnesia:wread({RouteTab, Topic}) of
[R] when R =:= Route ->
%% Remove route and trie
ok = mnesia:delete_object(RouteTab, Route, sticky_write),
case RouteTab of
emqx_route -> emqx_trie:delete(Topic);
emqx_session_route -> emqx_trie:delete_session(Topic)
case Type of
normal -> emqx_trie:delete(Topic);
session -> emqx_trie:delete_session(Topic)
end;
[_|_] ->
%% Remove route only

View File

@ -163,6 +163,10 @@ fields("persistent_session_store") ->
sc(boolean(),
#{ default => "false"
})},
{"storage_type",
sc(hoconsc:union([ram, disc]),
#{ default => disc
})},
{"max_retain_undelivered",
sc(duration(),
#{ default => "1h"

View File

@ -24,12 +24,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([ create_init_tab/0
, create_router_tab/0
, start_link/2]).
%% Route APIs
@ -60,7 +56,8 @@
-type(dest() :: node() | {group(), node()}).
-define(ROUTE_TAB, emqx_session_route).
-define(ROUTE_RAM_TAB, emqx_session_route_ram).
-define(ROUTE_DISC_TAB, emqx_session_route_disc).
-define(SESSION_INIT_TAB, session_init_tab).
@ -68,13 +65,21 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = mria:create_table(?ROUTE_TAB, [
create_router_tab() ->
ok = mria:create_table(?ROUTE_DISC_TAB, [
{type, bag},
{rlog_shard, ?ROUTE_SHARD},
{storage, disc_copies},
{record_name, route},
{attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = mria:create_table(?ROUTE_RAM_TAB, [
{type, bag},
{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
{record_name, route},
{attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]).
@ -103,11 +108,11 @@ do_add_route(Topic, SessionID) when is_binary(Topic) ->
false ->
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:insert_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route],
Fun = fun emqx_router_utils:insert_session_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [route_tab(), Route],
?PERSISTENT_SESSION_SHARD);
false ->
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
emqx_router_utils:insert_direct_route(route_tab(), Route)
end
end.
@ -136,10 +141,10 @@ do_delete_route(Topic, SessionID) ->
Route = #route{topic = Topic, dest = SessionID},
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:delete_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD);
Fun = fun emqx_router_utils:delete_session_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD);
false ->
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
emqx_router_utils:delete_direct_route(route_tab(), Route)
end.
%% @doc Print routes to a topic
@ -273,4 +278,10 @@ init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) ->
%%--------------------------------------------------------------------
lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
ets:lookup(route_tab(), Topic).
route_tab() ->
case emqx_persistent_session:storage_type() of
disc -> ?ROUTE_DISC_TAB;
ram -> ?ROUTE_RAM_TAB
end.

View File

@ -48,7 +48,8 @@
-endif.
-define(TRIE, emqx_trie).
-define(SESSION_TRIE, emqx_session_trie).
-define(SESSION_DISC_TRIE, emqx_session_trie_disc).
-define(SESSION_RAM_TRIE, emqx_session_trie_ram).
-define(PREFIX(Prefix), {Prefix, 0}).
-define(TOPIC(Topic), {Topic, 1}).
@ -79,12 +80,19 @@ create_session_trie() ->
StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true}
]}],
ok = mria:create_table(?SESSION_TRIE,
ok = mria:create_table(?SESSION_DISC_TRIE,
[{rlog_shard, ?ROUTE_SHARD},
{storage, disc_copies},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}]),
ok = mria:create_table(?SESSION_RAM_TRIE,
[{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}]).
%%--------------------------------------------------------------------
@ -98,7 +106,7 @@ insert(Topic) when is_binary(Topic) ->
-spec(insert_session(emqx_topic:topic()) -> ok).
insert_session(Topic) when is_binary(Topic) ->
insert(Topic, ?SESSION_TRIE).
insert(Topic, session_trie()).
insert(Topic, Trie) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic),
@ -115,7 +123,7 @@ delete(Topic) when is_binary(Topic) ->
%% @doc Delete a topic filter from the trie.
-spec(delete_session(emqx_topic:topic()) -> ok).
delete_session(Topic) when is_binary(Topic) ->
delete(Topic, ?SESSION_TRIE).
delete(Topic, session_trie()).
delete(Topic, Trie) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic),
@ -131,7 +139,7 @@ match(Topic) when is_binary(Topic) ->
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
match_session(Topic) when is_binary(Topic) ->
match(Topic, ?SESSION_TRIE).
match(Topic, session_trie()).
match(Topic, Trie) when is_binary(Topic) ->
Words = emqx_topic:words(Topic),
@ -154,7 +162,7 @@ match(Topic, Trie) when is_binary(Topic) ->
empty() -> empty(?TRIE).
empty_session() ->
empty(?SESSION_TRIE).
empty(session_trie()).
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
@ -164,12 +172,19 @@ lock_tables() ->
-spec lock_session_tables() -> ok.
lock_session_tables() ->
mnesia:write_lock_table(?SESSION_TRIE).
mnesia:write_lock_table(session_trie()).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
session_trie() ->
case emqx_persistent_session:storage_type() of
disc -> ?SESSION_DISC_TRIE;
ram -> ?SESSION_RAM_TRIE
end.
make_keys(Topic) ->
Words = emqx_topic:words(Topic),
{?TOPIC(Topic), [?PREFIX(Prefix) || Prefix <- make_prefixes(Words)]}.

View File

@ -50,13 +50,17 @@ groups() ->
SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
GCTests = [TC || TC <- TCs, is_gc_tc(TC)],
OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
[ {persistent_store_enabled, [ {group, no_kill_connection_process}
, {group, kill_connection_process}
, {group, snabbkaffe}
, {group, gc_tests}
[ {persistent_store_enabled, [ {group, ram_tables}
, {group, disc_tables}
]}
, {persistent_store_disabled, [ {group, no_kill_connection_process}
]}
, { ram_tables, [], [ {group, no_kill_connection_process}
, {group, snabbkaffe}
, {group, gc_tests}]}
, { disc_tables, [], [ {group, no_kill_connection_process}
, {group, snabbkaffe}
, {group, gc_tests}]}
, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
, { kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
, {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}]}
@ -76,15 +80,23 @@ is_gc_tc(TC) ->
re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
init_per_group(persistent_store_enabled, Config) ->
[{persistent_store_enabled, true}|Config];
init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables ->
%% Start Apps
Reply = case Group =:= ram_tables of
true -> ram;
false -> disc
end,
emqx_common_test_helpers:boot_modules(all),
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_config, get, fun(?is_enabled_key) -> true;
(Other) -> meck:passthrough([Other])
meck:expect(emqx_config, get, fun(?storage_type_key) -> Reply;
(?is_enabled_key) -> true;
(Other) -> meck:passthrough([Other])
end),
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
?assertEqual(true, emqx_persistent_session:is_store_enabled()),
[{persistent_store_enabled, true}|Config];
?assertEqual(Reply, emqx_persistent_session:storage_type()),
Config;
init_per_group(persistent_store_disabled, Config) ->
%% Start Apps
emqx_common_test_helpers:boot_modules(all),
@ -125,15 +137,20 @@ init_per_group(gc_tests, Config) ->
receive stop -> ok end
end),
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
(?MSG_TAB) -> ets:first(MsgEts);
(X) -> meck:passthrough(X)
meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts);
(?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts);
(?MSG_TAB_RAM) -> ets:first(MsgEts);
(?MSG_TAB_DISC) -> ets:first(MsgEts);
(X) -> meck:passthrough([X])
end),
meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
(?MSG_TAB, X) -> ets:next(MsgEts, X);
meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X);
(?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X);
(?MSG_TAB_RAM, X) -> ets:next(MsgEts, X);
(?MSG_TAB_DISC, X) -> ets:next(MsgEts, X);
(Tab, X) -> meck:passthrough([Tab, X])
end),
meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X);
meck:expect(mnesia, dirty_delete, fun(?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X);
(?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X);
(Tab, X) -> meck:passthrough([Tab, X])
end),
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
@ -154,7 +171,7 @@ end_per_group(gc_tests, Config) ->
meck:unload(mnesia),
?config(store_owner, Config) ! stop,
ok;
end_per_group(persistent_store_enabled, _Config) ->
end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables ->
meck:unload(emqx_config),
emqx_common_test_helpers:stop_apps([]);
end_per_group(persistent_store_disabled, _Config) ->