diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 2f92b9904..d31530810 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -509,22 +509,38 @@ emqx_schema { } } - persistent_session_store_storage_type { + persistent_session_store_backend { desc { - en: "Store information about persistent sessions on disc or in ram.\n" - "If ram is chosen, all information about persistent sessions remains\n" - "as long as at least one node in a cluster is alive to keep the information.\n" - "If disc is chosen, the information is persisted on disc and will survive\n" - "cluster restart, at the price of more disc usage and less throughput.\n" - zh: "将有关持久会话的信息存储在磁盘或内存中。\n" - "如果选择了ram,有关持久会话的所有信息将保留\n" - "只要群集中至少有一个节点处于活动状态,就可以保留信息。\n" - "如果选择了光盘,则信息将保留在光盘上,并且将继续存在\n" - "群集重新启动,代价是磁盘使用量增加,吞吐量降低。\n" + en: "Database backend used to store information about the persistent sessions.\n" + "- `builtin`: Use an embedded database (mria)" + zh: "用于存储持久性会话信息的数据库后端\n" + "- `builtin`: 使用一个嵌入式数据库(mria)" } label: { - en: "Storage type" - zh: "存储类型" + en: "Backend" + zh: "后台" + } + } + + persistent_store_on_disc { + desc { + en: "Persist session data on disc. If `false`, the data will be stored in RAM." + zh: "将会话数据保留在磁盘上。如果`false`,数据将被存储在RAM中。" + } + label: { + en: "Persist on disc" + zh: "坚持 在光盘上" + } + } + + persistent_store_ram_cache { + desc { + en: "Maintain a copy of the data in RAM for faster access." + zh: "在RAM中保持一份数据的副本,以便更快地访问。" + } + label: { + en: "RAM cache" + zh: "RAM缓存" } } @@ -569,6 +585,39 @@ emqx_schema { } } + persistent_session_builtin_session_table { + desc { + en: "Tuning for built-in session table." + zh: "对内置的会话表进行调整。" + } + label: { + en: "Persistent session." + zh: "持久会话。" + } + } + + persistent_session_builtin_sess_msg_table { + desc { + en: "Tuning for built-in session messages table." + zh: "对内置的会话信息表进行调整。" + } + label: { + en: "Persistent session messages." + zh: "持久的会话信息。" + } + } + + persistent_session_builtin_messages_table { + desc { + en: "Tuning for built-in messages table." + zh: "对内置信息表进行调校。" + } + label: { + en: "Persistent messages." + zh: "持久性信息。" + } + } + stats_enable { desc { en: "Enable/disable statistic data collection." diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl deleted file mode 100644 index 3a4dd5b56..000000000 --- a/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_mnesia_disc_backend). - --include("emqx.hrl"). --include("emqx_persistent_session.hrl"). - --export([ - create_tables/0, - first_message_id/0, - next_message_id/1, - delete_message/1, - first_session_message/0, - next_session_message/1, - delete_session_message/1, - put_session_store/1, - delete_session_store/1, - lookup_session_store/1, - put_session_message/1, - put_message/1, - get_message/1, - ro_transaction/1 -]). - -create_tables() -> - ok = mria:create_table(?SESSION_STORE_DISC, [ - {type, set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, disc_copies}, - {record_name, session_store}, - {attributes, record_info(fields, session_store)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]} - ]), - - ok = mria:create_table(?SESS_MSG_TAB_DISC, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, disc_copies}, - {record_name, session_msg}, - {attributes, record_info(fields, session_msg)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]), - - ok = mria:create_table(?MSG_TAB_DISC, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, disc_copies}, - {record_name, message}, - {attributes, record_info(fields, message)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]). - -first_session_message() -> - mnesia:dirty_first(?SESS_MSG_TAB_DISC). - -next_session_message(Key) -> - mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key). - -first_message_id() -> - mnesia:dirty_first(?MSG_TAB_DISC). - -next_message_id(Key) -> - mnesia:dirty_next(?MSG_TAB_DISC, Key). - -delete_message(Key) -> - mria:dirty_delete(?MSG_TAB_DISC, Key). - -delete_session_message(Key) -> - mria:dirty_delete(?SESS_MSG_TAB_DISC, Key). - -put_session_store(SS) -> - mria:dirty_write(?SESSION_STORE_DISC, SS). - -delete_session_store(ClientID) -> - mria:dirty_delete(?SESSION_STORE_DISC, ClientID). - -lookup_session_store(ClientID) -> - case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of - [] -> none; - [SS] -> {value, SS} - end. - -put_session_message(SessMsg) -> - mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg). - -put_message(Msg) -> - mria:dirty_write(?MSG_TAB_DISC, Msg). - -get_message(MsgId) -> - case mnesia:read(?MSG_TAB_DISC, MsgId) of - [] -> error({msg_not_found, MsgId}); - [Msg] -> Msg - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), - Res. diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl deleted file mode 100644 index f6d956079..000000000 --- a/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_mnesia_ram_backend). - --include("emqx.hrl"). --include("emqx_persistent_session.hrl"). - --export([ - create_tables/0, - first_message_id/0, - next_message_id/1, - delete_message/1, - first_session_message/0, - next_session_message/1, - delete_session_message/1, - put_session_store/1, - delete_session_store/1, - lookup_session_store/1, - put_session_message/1, - put_message/1, - get_message/1, - ro_transaction/1 -]). - -create_tables() -> - ok = mria:create_table(?SESSION_STORE_RAM, [ - {type, set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, ram_copies}, - {record_name, session_store}, - {attributes, record_info(fields, session_store)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]} - ]), - - ok = mria:create_table(?SESS_MSG_TAB_RAM, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, ram_copies}, - {record_name, session_msg}, - {attributes, record_info(fields, session_msg)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]), - - ok = mria:create_table(?MSG_TAB_RAM, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, ram_copies}, - {record_name, message}, - {attributes, record_info(fields, message)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]). - -first_session_message() -> - mnesia:dirty_first(?SESS_MSG_TAB_RAM). - -next_session_message(Key) -> - mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key). - -first_message_id() -> - mnesia:dirty_first(?MSG_TAB_RAM). - -next_message_id(Key) -> - mnesia:dirty_next(?MSG_TAB_RAM, Key). - -delete_message(Key) -> - mria:dirty_delete(?MSG_TAB_RAM, Key). - -delete_session_message(Key) -> - mria:dirty_delete(?SESS_MSG_TAB_RAM, Key). - -put_session_store(SS) -> - mria:dirty_write(?SESSION_STORE_RAM, SS). - -delete_session_store(ClientID) -> - mria:dirty_delete(?SESSION_STORE_RAM, ClientID). - -lookup_session_store(ClientID) -> - case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of - [] -> none; - [SS] -> {value, SS} - end. - -put_session_message(SessMsg) -> - mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg). - -put_message(Msg) -> - mria:dirty_write(?MSG_TAB_RAM, Msg). - -get_message(MsgId) -> - case mnesia:read(?MSG_TAB_RAM, MsgId) of - [] -> error({msg_not_found, MsgId}); - [Msg] -> Msg - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), - Res. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 46949cc4e..039808c4b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -212,12 +212,36 @@ fields("persistent_session_store") -> desc => ?DESC(persistent_session_store_enabled) } )}, - {"storage_type", + {"on_disc", sc( - hoconsc:union([ram, disc]), + boolean(), #{ - default => disc, - desc => ?DESC(persistent_session_store_storage_type) + default => true, + desc => ?DESC(persistent_store_on_disc) + } + )}, + {"ram_cache", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(persistent_store_ram_cache) + } + )}, + {"backend", + sc( + hoconsc:union([ref("persistent_session_builtin")]), + #{ + default => #{ + <<"type">> => <<"builtin">>, + <<"session">> => + #{<<"ram_cache">> => <<"true">>}, + <<"session_messages">> => + #{<<"ram_cache">> => <<"true">>}, + <<"messages">> => + #{<<"ram_cache">> => <<"false">>} + }, + desc => ?DESC(persistent_session_store_backend) } )}, {"max_retain_undelivered", @@ -245,6 +269,33 @@ fields("persistent_session_store") -> } )} ]; +fields("persistent_table_mria_opts") -> + [ + {"ram_cache", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(persistent_store_ram_cache) + } + )} + ]; +fields("persistent_session_builtin") -> + [ + {"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})}, + {"session", + sc(ref("persistent_table_mria_opts"), #{ + desc => ?DESC(persistent_session_builtin_session_table) + })}, + {"session_messages", + sc(ref("persistent_table_mria_opts"), #{ + desc => ?DESC(persistent_session_builtin_sess_msg_table) + })}, + {"messages", + sc(ref("persistent_table_mria_opts"), #{ + desc => ?DESC(persistent_session_builtin_messages_table) + })} + ]; fields("stats") -> [ {"enable", @@ -1526,6 +1577,10 @@ base_listener() -> desc("persistent_session_store") -> "Settings for message persistence."; +desc("persistent_session_builtin") -> + "Settings for the built-in storage engine of persistent messages."; +desc("persistent_table_mria_opts") -> + "Tuning options for the mria table."; desc("stats") -> "Enable/disable statistic data collection.\n" "Statistic data such as message receive/send count/rate etc. " diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 013cb8f9b..30725be6f 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -50,8 +50,7 @@ -compile(nowarn_export_all). -endif. --define(SESSION_DISC_TRIE, emqx_session_trie_disc). --define(SESSION_RAM_TRIE, emqx_session_trie_ram). +-define(SESSION_TRIE, emqx_session_trie). -define(PREFIX(Prefix), {Prefix, 0}). -define(TOPIC(Topic), {Topic, 1}). @@ -82,7 +81,12 @@ mnesia(boot) -> {storage_properties, StoreProps} ]). -create_session_trie(disc) -> +create_session_trie(Type) -> + Storage = + case Type of + disc -> disc_copies; + ram -> ram_copies + end, StoreProps = [ {ets, [ {read_concurrency, true}, @@ -90,28 +94,10 @@ create_session_trie(disc) -> ]} ], ok = mria:create_table( - ?SESSION_DISC_TRIE, + ?SESSION_TRIE, [ {rlog_shard, ?ROUTE_SHARD}, - {storage, disc_copies}, - {record_name, ?TRIE}, - {attributes, record_info(fields, ?TRIE)}, - {type, ordered_set}, - {storage_properties, StoreProps} - ] - ); -create_session_trie(ram) -> - StoreProps = [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ], - ok = mria:create_table( - ?SESSION_RAM_TRIE, - [ - {rlog_shard, ?ROUTE_SHARD}, - {storage, ram_copies}, + {storage, Storage}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, @@ -204,10 +190,7 @@ lock_session_tables() -> %%-------------------------------------------------------------------- session_trie() -> - case emqx_persistent_session:storage_type() of - disc -> ?SESSION_DISC_TRIE; - ram -> ?SESSION_RAM_TRIE - end. + ?SESSION_TRIE. make_keys(Topic) -> Words = emqx_topic:words(Topic), diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl similarity index 97% rename from apps/emqx/src/emqx_persistent_session.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session.erl index 52cbbd3a9..527206674 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -19,6 +19,7 @@ -export([ is_store_enabled/0, init_db_backend/0, + storage_backend/0, storage_type/0 ]). @@ -81,6 +82,9 @@ -type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok'). +%% EMQX configuration keys +-define(conf_storage_backend, [persistent_session_store, backend, type]). + %%-------------------------------------------------------------------- %% Init %%-------------------------------------------------------------------- @@ -91,27 +95,23 @@ init_db_backend() -> StorageType = storage_type(), ok = emqx_trie:create_session_trie(StorageType), ok = emqx_session_router:create_router_tab(StorageType), - case StorageType of - disc -> - emqx_persistent_session_mnesia_disc_backend:create_tables(), - persistent_term:put( - ?db_backend_key, emqx_persistent_session_mnesia_disc_backend - ); - ram -> - emqx_persistent_session_mnesia_ram_backend:create_tables(), - persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend) + case storage_backend() of + builtin -> + emqx_persistent_session_backend_builtin:create_tables(), + persistent_term:put(?db_backend_key, emqx_persistent_session_backend_builtin) end, ok; false -> - persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend), + persistent_term:put(?db_backend_key, emqx_persistent_session_backend_dummy), ok end. is_store_enabled() -> emqx_config:get(?is_enabled_key). -storage_type() -> - emqx_config:get(?storage_type_key). +-spec storage_backend() -> builtin. +storage_backend() -> + emqx_config:get(?conf_storage_backend). %%-------------------------------------------------------------------- %% Session message ADT API @@ -557,3 +557,10 @@ gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) - %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing. NewAbandoned = S =:= SessionID andalso Abandoned, gc_traverse(next_session_message(Key), S, NewAbandoned, Fun). + +-spec storage_type() -> ram | disc. +storage_type() -> + case emqx_config:get(?on_disc_key) of + true -> disc; + false -> ram + end. diff --git a/apps/emqx/src/emqx_persistent_session.hrl b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl similarity index 65% rename from apps/emqx/src/emqx_persistent_session.hrl rename to apps/emqx/src/persistent_session/emqx_persistent_session.hrl index 094963163..03b823401 100644 --- a/apps/emqx/src/emqx_persistent_session.hrl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl @@ -14,15 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- --define(SESSION_STORE_DISC, emqx_session_store_disc). --define(SESSION_STORE_RAM, emqx_session_store_ram). - --define(SESS_MSG_TAB_DISC, emqx_session_msg_disc). --define(SESS_MSG_TAB_RAM, emqx_session_msg_ram). - --define(MSG_TAB_DISC, emqx_persistent_msg_disc). --define(MSG_TAB_RAM, emqx_persistent_msg_ram). - -record(session_store, { client_id :: binary(), expiry_interval :: non_neg_integer(), @@ -35,9 +26,14 @@ val = [] :: [] }). --define(db_backend_key, [persistent_session_store, db_backend]). --define(is_enabled_key, [persistent_session_store, enabled]). --define(storage_type_key, [persistent_session_store, storage_type]). --define(msg_retain, [persistent_session_store, max_retain_undelivered]). +-define(cfg_root, persistent_session_store). +-define(db_backend_key, [?cfg_root, db_backend]). +-define(is_enabled_key, [?cfg_root, enabled]). +-define(msg_retain, [?cfg_root, max_retain_undelivered]). +-define(on_disc_key, [?cfg_root, on_disc]). + +-define(SESSION_STORE, emqx_session_store). +-define(SESS_MSG_TAB, emqx_session_msg). +-define(MSG_TAB, emqx_persistent_msg). -define(db_backend, (persistent_term:get(?db_backend_key))). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl new file mode 100644 index 000000000..f6dde5af4 --- /dev/null +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl @@ -0,0 +1,150 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_backend_builtin). + +-include("emqx.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include("emqx_persistent_session.hrl"). + +-export([ + create_tables/0, + first_message_id/0, + next_message_id/1, + delete_message/1, + first_session_message/0, + next_session_message/1, + delete_session_message/1, + put_session_store/1, + delete_session_store/1, + lookup_session_store/1, + put_session_message/1, + put_message/1, + get_message/1, + ro_transaction/1 +]). + +-type mria_table_type() :: ram_copies | disc_copies | rocksdb_copies. + +-define(IS_ETS(BACKEND), (BACKEND =:= ram_copies orelse BACKEND =:= disc_copies)). + +create_tables() -> + SessStoreBackend = table_type(session), + ok = mria:create_table(?SESSION_STORE, [ + {type, set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, SessStoreBackend}, + {record_name, session_store}, + {attributes, record_info(fields, session_store)}, + {storage_properties, storage_properties(?SESSION_STORE, SessStoreBackend)} + ]), + + SessMsgBackend = table_type(session_messages), + ok = mria:create_table(?SESS_MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, SessMsgBackend}, + {record_name, session_msg}, + {attributes, record_info(fields, session_msg)}, + {storage_properties, storage_properties(?SESS_MSG_TAB, SessMsgBackend)} + ]), + + MsgBackend = table_type(messages), + ok = mria:create_table(?MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, MsgBackend}, + {record_name, message}, + {attributes, record_info(fields, message)}, + {storage_properties, storage_properties(?MSG_TAB, MsgBackend)} + ]). + +first_session_message() -> + mnesia:dirty_first(?SESS_MSG_TAB). + +next_session_message(Key) -> + mnesia:dirty_next(?SESS_MSG_TAB, Key). + +first_message_id() -> + mnesia:dirty_first(?MSG_TAB). + +next_message_id(Key) -> + mnesia:dirty_next(?MSG_TAB, Key). + +delete_message(Key) -> + mria:dirty_delete(?MSG_TAB, Key). + +delete_session_message(Key) -> + mria:dirty_delete(?SESS_MSG_TAB, Key). + +put_session_store(SS) -> + mria:dirty_write(?SESSION_STORE, SS). + +delete_session_store(ClientID) -> + mria:dirty_delete(?SESSION_STORE, ClientID). + +lookup_session_store(ClientID) -> + case mnesia:dirty_read(?SESSION_STORE, ClientID) of + [] -> none; + [SS] -> {value, SS} + end. + +put_session_message(SessMsg) -> + mria:dirty_write(?SESS_MSG_TAB, SessMsg). + +put_message(Msg) -> + mria:dirty_write(?MSG_TAB, Msg). + +get_message(MsgId) -> + case mnesia:read(?MSG_TAB, MsgId) of + [] -> error({msg_not_found, MsgId}); + [Msg] -> Msg + end. + +ro_transaction(Fun) -> + {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), + Res. + +-spec storage_properties(?SESSION_STORE | ?SESS_MSG_TAB | ?MSG_TAB, mria_table_type()) -> term(). +storage_properties(?SESSION_STORE, Backend) when ?IS_ETS(Backend) -> + [{ets, [{read_concurrency, true}]}]; +storage_properties(_, Backend) when ?IS_ETS(Backend) -> + [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ]; +storage_properties(_, _) -> + []. + +-spec table_type(atom()) -> mria_table_type(). +table_type(Table) -> + DiscPersistence = emqx_config:get([?cfg_root, on_disc]), + RamCache = get_overlayed(Table, ram_cache), + case {DiscPersistence, RamCache} of + {true, true} -> + disc_copies; + {true, false} -> + rocksdb_copies; + {false, _} -> + ram_copies + end. + +-spec get_overlayed(atom(), on_disc | ram_cache) -> boolean(). +get_overlayed(Table, Suffix) -> + Default = emqx_config:get([?cfg_root, Suffix]), + emqx_config:get([?cfg_root, backend, Table, Suffix], Default). diff --git a/apps/emqx/src/emqx_persistent_session_dummy_backend.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl similarity index 97% rename from apps/emqx/src/emqx_persistent_session_dummy_backend.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl index 85b0b99b1..697c46cfe 100644 --- a/apps/emqx/src/emqx_persistent_session_dummy_backend.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_persistent_session_dummy_backend). +-module(emqx_persistent_session_backend_dummy). -include("emqx_persistent_session.hrl"). diff --git a/apps/emqx/src/emqx_persistent_session_gc.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl similarity index 100% rename from apps/emqx/src/emqx_persistent_session_gc.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl diff --git a/apps/emqx/src/emqx_persistent_session_sup.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl similarity index 100% rename from apps/emqx/src/emqx_persistent_session_sup.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 08ecf4632..6be38df0d 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -168,8 +168,8 @@ typecheck_apis( [ logger:error( "Incompatible RPC call: " - "type of the parameter ~p of RPC call ~s on release ~p " - "is not a subtype of the target function ~s on release ~p.~n" + "type of the parameter ~p of RPC call ~s in release ~p " + "is not a subtype of the target function ~s in release ~p.~n" "Caller type: ~s~nCallee type: ~s~n", [ Var, diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index ba91409e8..782104455 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -20,7 +20,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("../include/emqx.hrl"). --include("../src/emqx_persistent_session.hrl"). +-include("../src/persistent_session/emqx_persistent_session.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -69,6 +69,12 @@ groups() -> {group, snabbkaffe}, {group, gc_tests} ]}, + {rocks_tables, [], [ + {group, no_kill_connection_process}, + {group, kill_connection_process}, + {group, snabbkaffe}, + {group, gc_tests} + ]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {snabbkaffe, [], [ @@ -101,13 +107,12 @@ init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables - emqx_common_test_helpers:boot_modules(all), meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_config, get, fun - (?storage_type_key) -> Reply; + (?on_disc_key) -> Reply =:= disc; (?is_enabled_key) -> true; (Other) -> meck:passthrough([Other]) end), emqx_common_test_helpers:start_apps([], fun set_special_confs/1), ?assertEqual(true, emqx_persistent_session:is_store_enabled()), - ?assertEqual(Reply, emqx_persistent_session:storage_type()), Config; init_per_group(persistent_store_disabled, Config) -> %% Start Apps @@ -159,22 +164,17 @@ init_per_group(gc_tests, Config) -> end), meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), meck:expect(mnesia, dirty_first, fun - (?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts); - (?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts); - (?MSG_TAB_RAM) -> ets:first(MsgEts); - (?MSG_TAB_DISC) -> ets:first(MsgEts); + (?SESS_MSG_TAB) -> ets:first(SessionMsgEts); + (?MSG_TAB) -> ets:first(MsgEts); (X) -> meck:passthrough([X]) end), meck:expect(mnesia, dirty_next, fun - (?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X); - (?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X); - (?MSG_TAB_RAM, X) -> ets:next(MsgEts, X); - (?MSG_TAB_DISC, X) -> ets:next(MsgEts, X); + (?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X); + (?MSG_TAB, X) -> ets:next(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), meck:expect(mnesia, dirty_delete, fun - (?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X); - (?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X); + (?MSG_TAB, X) -> ets:delete(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config]. @@ -193,7 +193,9 @@ end_per_group(gc_tests, Config) -> meck:unload(mnesia), ?config(store_owner, Config) ! stop, ok; -end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables -> +end_per_group(Group, _Config) when + Group =:= ram_tables; Group =:= disc_tables; Group =:= rocks_tables +-> meck:unload(emqx_config), emqx_common_test_helpers:stop_apps([]); end_per_group(persistent_store_disabled, _Config) ->