diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/emqx_persistent_session.erl index 49e442d53..bf815f27c 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/emqx_persistent_session.erl @@ -82,14 +82,14 @@ init_db_backend() -> case is_store_enabled() of true -> - TableType = + Backend = case emqx_config:get(?db_backend_key) of - mnesia_ram -> ram_copies; - mnesia_disc -> disc_copies + mnesia_ram -> emqx_persistent_session_mnesia_ram_backend; + mnesia_disc -> emqx_persistent_session_mnesia_disc_backend end, - ok = emqx_trie:create_session_trie(TableType), - emqx_persistent_session_mnesia_backend:create_tables(TableType), - persistent_term:put(?db_backend_module, emqx_persistent_session_mnesia_backend), + persistent_term:put(?db_backend_module, Backend), + Backend:create_tables(), + ok = emqx_trie:create_session_trie(), ok; false -> persistent_term:put(?db_backend_module, emqx_persistent_session_dummy_backend), diff --git a/apps/emqx/src/emqx_persistent_session.hrl b/apps/emqx/src/emqx_persistent_session.hrl index 371139441..94b99c390 100644 --- a/apps/emqx/src/emqx_persistent_session.hrl +++ b/apps/emqx/src/emqx_persistent_session.hrl @@ -14,10 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- --define(SESSION_STORE, emqx_session_store). --define(SESS_MSG_TAB, emqx_session_msg). --define(MSG_TAB, emqx_persistent_msg). - -record(session_store, { client_id :: binary() , expiry_interval :: non_neg_integer() , ts :: non_neg_integer() diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl new file mode 100644 index 000000000..2a7b0856b --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl @@ -0,0 +1,114 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_mnesia_disc_backend). + +-include("emqx.hrl"). +-include("emqx_persistent_session.hrl"). + +-define(SESSION_STORE, emqx_session_store_disc). +-define(SESS_MSG_TAB, emqx_session_msg_disc). +-define(MSG_TAB, emqx_persistent_msg_disc). + +-export([ create_tables/0 + , first_message_id/0 + , next_message_id/1 + , delete_message/1 + , first_session_message/0 + , next_session_message/1 + , delete_session_message/1 + , put_session_store/1 + , delete_session_store/1 + , lookup_session_store/1 + , put_session_message/1 + , put_message/1 + , get_message/1 + , ro_transaction/1 + ]). + +create_tables() -> + ok = mria:create_table(?SESSION_STORE, [ + {type, set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, disc_copies}, + {record_name, session_store}, + {attributes, record_info(fields, session_store)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]}]), + + ok = mria:create_table(?SESS_MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, disc_copies}, + {record_name, session_msg}, + {attributes, record_info(fields, session_msg)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), + + ok = mria:create_table(?MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, disc_copies}, + {record_name, message}, + {attributes, record_info(fields, message)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]). + +first_session_message() -> + mnesia:dirty_first(?SESS_MSG_TAB). + +next_session_message(Key) -> + mnesia:dirty_next(?SESS_MSG_TAB, Key). + +first_message_id() -> + mnesia:dirty_first(?MSG_TAB). + +next_message_id(Key) -> + mnesia:dirty_next(?MSG_TAB, Key). + +delete_message(Key) -> + mria:dirty_delete(?MSG_TAB, Key). + +delete_session_message(Key) -> + mria:dirty_delete(?SESS_MSG_TAB, Key). + +put_session_store(SS) -> + mria:dirty_write(?SESSION_STORE, SS). + +delete_session_store(ClientID) -> + mria:dirty_delete(?SESSION_STORE, ClientID). + +lookup_session_store(ClientID) -> + case mnesia:dirty_read(?SESSION_STORE, ClientID) of + [] -> none; + [SS] -> {value, SS} + end. + +put_session_message(SessMsg) -> + mria:dirty_write(?SESS_MSG_TAB, SessMsg). + +put_message(Msg) -> + mria:dirty_write(?MSG_TAB, Msg). + +get_message(MsgId) -> + case mnesia:read(?MSG_TAB, MsgId) of + [] -> error({msg_not_found, MsgId}); + [Msg] -> Msg + end. + +ro_transaction(Fun) -> + {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), + Res. + diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl similarity index 90% rename from apps/emqx/src/emqx_persistent_session_mnesia_backend.erl rename to apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl index 9d587292f..b4b70abb8 100644 --- a/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl +++ b/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl @@ -14,12 +14,16 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_persistent_session_mnesia_backend). +-module(emqx_persistent_session_mnesia_ram_backend). -include("emqx.hrl"). -include("emqx_persistent_session.hrl"). --export([ create_tables/1 +-define(SESSION_STORE, emqx_session_store_ram). +-define(SESS_MSG_TAB, emqx_session_msg_ram). +-define(MSG_TAB, emqx_persistent_msg_ram). + +-export([ create_tables/0 , first_message_id/0 , next_message_id/1 , delete_message/1 @@ -35,12 +39,11 @@ , ro_transaction/1 ]). -create_tables(TableType) when TableType =:= ram_copies; - TableType =:= disc_copies -> +create_tables() -> ok = mria:create_table(?SESSION_STORE, [ {type, set}, {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, TableType}, + {storage, ram_copies}, {record_name, session_store}, {attributes, record_info(fields, session_store)}, {storage_properties, [{ets, [{read_concurrency, true}]}]}]), @@ -48,7 +51,7 @@ create_tables(TableType) when TableType =:= ram_copies; ok = mria:create_table(?SESS_MSG_TAB, [ {type, ordered_set}, {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, TableType}, + {storage, ram_copies}, {record_name, session_msg}, {attributes, record_info(fields, session_msg)}, {storage_properties, [{ets, [{read_concurrency, true}, @@ -57,7 +60,7 @@ create_tables(TableType) when TableType =:= ram_copies; ok = mria:create_table(?MSG_TAB, [ {type, ordered_set}, {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, TableType}, + {storage, ram_copies}, {record_name, message}, {attributes, record_info(fields, message)}, {storage_properties, [{ets, [{read_concurrency, true}, diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index d93cfa483..f27cf7749 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -17,10 +17,11 @@ -module(emqx_trie). -include("emqx.hrl"). +-include("emqx_persistent_session.hrl"). %% Mnesia bootstrap -export([ mnesia/1 - , create_session_trie/1 + , create_session_trie/0 ]). -boot_mnesia({mnesia, [boot]}). @@ -48,7 +49,8 @@ -endif. -define(TRIE, emqx_trie). --define(SESSION_TRIE, emqx_session_trie). +-define(SESSION_TRIE_RAM, emqx_session_trie_ram). +-define(SESSION_TRIE_DISC, emqx_session_trie_disc). -define(PREFIX(Prefix), {Prefix, 0}). -define(TOPIC(Topic), {Topic, 1}). @@ -57,6 +59,12 @@ , count = 0 :: non_neg_integer() }). +-define(ACTIVE_SESSION_TRIE, case emqx_config:get(?db_backend_key) of + mnesia_ram -> ?SESSION_TRIE_RAM; + mnesia_disc -> ?SESSION_TRIE_DISC + end). + + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -75,19 +83,26 @@ mnesia(boot) -> {type, ordered_set}, {storage_properties, StoreProps}]). -create_session_trie(TableType) when TableType =:= ram_copies; - TableType =:= disc_copies -> +create_session_trie() -> StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true} ]}], - ok = mria:create_table(?SESSION_TRIE, + ok = mria:create_table(?SESSION_TRIE_RAM, [{rlog_shard, ?ROUTE_SHARD}, - {storage, TableType}, + {storage, ram_copies}, + {record_name, ?TRIE}, + {attributes, record_info(fields, ?TRIE)}, + {type, ordered_set}, + {storage_properties, StoreProps}]), + ok = mria:create_table(?SESSION_TRIE_DISC, + [{rlog_shard, ?ROUTE_SHARD}, + {storage, disc_copies}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, {storage_properties, StoreProps}]). + %%-------------------------------------------------------------------- %% Topics APIs %%-------------------------------------------------------------------- @@ -99,7 +114,7 @@ insert(Topic) when is_binary(Topic) -> -spec(insert_session(emqx_topic:topic()) -> ok). insert_session(Topic) when is_binary(Topic) -> - insert(Topic, ?SESSION_TRIE). + insert(Topic, ?ACTIVE_SESSION_TRIE). insert(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), @@ -116,7 +131,7 @@ delete(Topic) when is_binary(Topic) -> %% @doc Delete a topic filter from the trie. -spec(delete_session(emqx_topic:topic()) -> ok). delete_session(Topic) when is_binary(Topic) -> - delete(Topic, ?SESSION_TRIE). + delete(Topic, ?ACTIVE_SESSION_TRIE). delete(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), @@ -132,7 +147,7 @@ match(Topic) when is_binary(Topic) -> -spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())). match_session(Topic) when is_binary(Topic) -> - match(Topic, ?SESSION_TRIE). + match(Topic, ?ACTIVE_SESSION_TRIE). match(Topic, Trie) when is_binary(Topic) -> Words = emqx_topic:words(Topic), @@ -155,7 +170,7 @@ match(Topic, Trie) when is_binary(Topic) -> empty() -> empty(?TRIE). empty_session() -> - empty(?SESSION_TRIE). + empty(?ACTIVE_SESSION_TRIE). empty(Trie) -> ets:first(Trie) =:= '$end_of_table'. @@ -165,7 +180,7 @@ lock_tables() -> -spec lock_session_tables() -> ok. lock_session_tables() -> - mnesia:write_lock_table(?SESSION_TRIE). + mnesia:write_lock_table(?ACTIVE_SESSION_TRIE). %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 3e992ce2d..7647f088f 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -124,15 +124,15 @@ init_per_group(gc_tests, Config) -> receive stop -> ok end end), meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), - meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts); - (?MSG_TAB) -> ets:first(MsgEts); + meck:expect(mnesia, dirty_first, fun(emqx_session_msg_ram) -> ets:first(SessionMsgEts); + (emqx_persistent_msg_ram) -> ets:first(MsgEts); (X) -> meck:passthrough(X) end), - meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X); - (?MSG_TAB, X) -> ets:next(MsgEts, X); + meck:expect(mnesia, dirty_next, fun(emqx_session_msg_ram, X) -> ets:next(SessionMsgEts, X); + (emqx_persistent_msg_ram, X) -> ets:next(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), - meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X); + meck:expect(mnesia, dirty_delete, fun(emqx_persistent_msg_ram, X) -> ets:delete(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].