From 5221c6b2f9ed8104f0930ce4d0ff05e82cc1795a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 8 May 2022 23:20:37 +0200 Subject: [PATCH 1/6] feat(persistent_session): Make schema more flexible --- apps/emqx/i18n/emqx_schema_i18n.conf | 75 +++++++-- ...persistent_session_mnesia_disc_backend.erl | 121 -------------- ..._persistent_session_mnesia_ram_backend.erl | 121 -------------- apps/emqx/src/emqx_schema.erl | 63 +++++++- apps/emqx/src/emqx_trie.erl | 37 ++--- .../emqx_persistent_session.erl | 31 ++-- .../emqx_persistent_session.hrl | 22 ++- ...mqx_persistent_session_backend_builtin.erl | 150 ++++++++++++++++++ ...emqx_persistent_session_backend_dummy.erl} | 2 +- .../emqx_persistent_session_gc.erl | 0 .../emqx_persistent_session_sup.erl | 0 apps/emqx/test/emqx_bpapi_static_checks.erl | 4 +- .../test/emqx_persistent_session_SUITE.erl | 30 ++-- 13 files changed, 328 insertions(+), 328 deletions(-) delete mode 100644 apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl delete mode 100644 apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl rename apps/emqx/src/{ => persistent_session}/emqx_persistent_session.erl (97%) rename apps/emqx/src/{ => persistent_session}/emqx_persistent_session.hrl (65%) create mode 100644 apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl rename apps/emqx/src/{emqx_persistent_session_dummy_backend.erl => persistent_session/emqx_persistent_session_backend_dummy.erl} (97%) rename apps/emqx/src/{ => persistent_session}/emqx_persistent_session_gc.erl (100%) rename apps/emqx/src/{ => persistent_session}/emqx_persistent_session_sup.erl (100%) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 2f92b9904..d31530810 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -509,22 +509,38 @@ emqx_schema { } } - persistent_session_store_storage_type { + persistent_session_store_backend { desc { - en: "Store information about persistent sessions on disc or in ram.\n" - "If ram is chosen, all information about persistent sessions remains\n" - "as long as at least one node in a cluster is alive to keep the information.\n" - "If disc is chosen, the information is persisted on disc and will survive\n" - "cluster restart, at the price of more disc usage and less throughput.\n" - zh: "将有关持久会话的信息存储在磁盘或内存中。\n" - "如果选择了ram,有关持久会话的所有信息将保留\n" - "只要群集中至少有一个节点处于活动状态,就可以保留信息。\n" - "如果选择了光盘,则信息将保留在光盘上,并且将继续存在\n" - "群集重新启动,代价是磁盘使用量增加,吞吐量降低。\n" + en: "Database backend used to store information about the persistent sessions.\n" + "- `builtin`: Use an embedded database (mria)" + zh: "用于存储持久性会话信息的数据库后端\n" + "- `builtin`: 使用一个嵌入式数据库(mria)" } label: { - en: "Storage type" - zh: "存储类型" + en: "Backend" + zh: "后台" + } + } + + persistent_store_on_disc { + desc { + en: "Persist session data on disc. If `false`, the data will be stored in RAM." + zh: "将会话数据保留在磁盘上。如果`false`,数据将被存储在RAM中。" + } + label: { + en: "Persist on disc" + zh: "坚持 在光盘上" + } + } + + persistent_store_ram_cache { + desc { + en: "Maintain a copy of the data in RAM for faster access." + zh: "在RAM中保持一份数据的副本,以便更快地访问。" + } + label: { + en: "RAM cache" + zh: "RAM缓存" } } @@ -569,6 +585,39 @@ emqx_schema { } } + persistent_session_builtin_session_table { + desc { + en: "Tuning for built-in session table." + zh: "对内置的会话表进行调整。" + } + label: { + en: "Persistent session." + zh: "持久会话。" + } + } + + persistent_session_builtin_sess_msg_table { + desc { + en: "Tuning for built-in session messages table." + zh: "对内置的会话信息表进行调整。" + } + label: { + en: "Persistent session messages." + zh: "持久的会话信息。" + } + } + + persistent_session_builtin_messages_table { + desc { + en: "Tuning for built-in messages table." + zh: "对内置信息表进行调校。" + } + label: { + en: "Persistent messages." + zh: "持久性信息。" + } + } + stats_enable { desc { en: "Enable/disable statistic data collection." diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl deleted file mode 100644 index 3a4dd5b56..000000000 --- a/apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_mnesia_disc_backend). - --include("emqx.hrl"). --include("emqx_persistent_session.hrl"). - --export([ - create_tables/0, - first_message_id/0, - next_message_id/1, - delete_message/1, - first_session_message/0, - next_session_message/1, - delete_session_message/1, - put_session_store/1, - delete_session_store/1, - lookup_session_store/1, - put_session_message/1, - put_message/1, - get_message/1, - ro_transaction/1 -]). - -create_tables() -> - ok = mria:create_table(?SESSION_STORE_DISC, [ - {type, set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, disc_copies}, - {record_name, session_store}, - {attributes, record_info(fields, session_store)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]} - ]), - - ok = mria:create_table(?SESS_MSG_TAB_DISC, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, disc_copies}, - {record_name, session_msg}, - {attributes, record_info(fields, session_msg)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]), - - ok = mria:create_table(?MSG_TAB_DISC, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, disc_copies}, - {record_name, message}, - {attributes, record_info(fields, message)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]). - -first_session_message() -> - mnesia:dirty_first(?SESS_MSG_TAB_DISC). - -next_session_message(Key) -> - mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key). - -first_message_id() -> - mnesia:dirty_first(?MSG_TAB_DISC). - -next_message_id(Key) -> - mnesia:dirty_next(?MSG_TAB_DISC, Key). - -delete_message(Key) -> - mria:dirty_delete(?MSG_TAB_DISC, Key). - -delete_session_message(Key) -> - mria:dirty_delete(?SESS_MSG_TAB_DISC, Key). - -put_session_store(SS) -> - mria:dirty_write(?SESSION_STORE_DISC, SS). - -delete_session_store(ClientID) -> - mria:dirty_delete(?SESSION_STORE_DISC, ClientID). - -lookup_session_store(ClientID) -> - case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of - [] -> none; - [SS] -> {value, SS} - end. - -put_session_message(SessMsg) -> - mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg). - -put_message(Msg) -> - mria:dirty_write(?MSG_TAB_DISC, Msg). - -get_message(MsgId) -> - case mnesia:read(?MSG_TAB_DISC, MsgId) of - [] -> error({msg_not_found, MsgId}); - [Msg] -> Msg - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), - Res. diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl deleted file mode 100644 index f6d956079..000000000 --- a/apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_mnesia_ram_backend). - --include("emqx.hrl"). --include("emqx_persistent_session.hrl"). - --export([ - create_tables/0, - first_message_id/0, - next_message_id/1, - delete_message/1, - first_session_message/0, - next_session_message/1, - delete_session_message/1, - put_session_store/1, - delete_session_store/1, - lookup_session_store/1, - put_session_message/1, - put_message/1, - get_message/1, - ro_transaction/1 -]). - -create_tables() -> - ok = mria:create_table(?SESSION_STORE_RAM, [ - {type, set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, ram_copies}, - {record_name, session_store}, - {attributes, record_info(fields, session_store)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]} - ]), - - ok = mria:create_table(?SESS_MSG_TAB_RAM, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, ram_copies}, - {record_name, session_msg}, - {attributes, record_info(fields, session_msg)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]), - - ok = mria:create_table(?MSG_TAB_RAM, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, ram_copies}, - {record_name, message}, - {attributes, record_info(fields, message)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]). - -first_session_message() -> - mnesia:dirty_first(?SESS_MSG_TAB_RAM). - -next_session_message(Key) -> - mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key). - -first_message_id() -> - mnesia:dirty_first(?MSG_TAB_RAM). - -next_message_id(Key) -> - mnesia:dirty_next(?MSG_TAB_RAM, Key). - -delete_message(Key) -> - mria:dirty_delete(?MSG_TAB_RAM, Key). - -delete_session_message(Key) -> - mria:dirty_delete(?SESS_MSG_TAB_RAM, Key). - -put_session_store(SS) -> - mria:dirty_write(?SESSION_STORE_RAM, SS). - -delete_session_store(ClientID) -> - mria:dirty_delete(?SESSION_STORE_RAM, ClientID). - -lookup_session_store(ClientID) -> - case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of - [] -> none; - [SS] -> {value, SS} - end. - -put_session_message(SessMsg) -> - mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg). - -put_message(Msg) -> - mria:dirty_write(?MSG_TAB_RAM, Msg). - -get_message(MsgId) -> - case mnesia:read(?MSG_TAB_RAM, MsgId) of - [] -> error({msg_not_found, MsgId}); - [Msg] -> Msg - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), - Res. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 46949cc4e..039808c4b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -212,12 +212,36 @@ fields("persistent_session_store") -> desc => ?DESC(persistent_session_store_enabled) } )}, - {"storage_type", + {"on_disc", sc( - hoconsc:union([ram, disc]), + boolean(), #{ - default => disc, - desc => ?DESC(persistent_session_store_storage_type) + default => true, + desc => ?DESC(persistent_store_on_disc) + } + )}, + {"ram_cache", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(persistent_store_ram_cache) + } + )}, + {"backend", + sc( + hoconsc:union([ref("persistent_session_builtin")]), + #{ + default => #{ + <<"type">> => <<"builtin">>, + <<"session">> => + #{<<"ram_cache">> => <<"true">>}, + <<"session_messages">> => + #{<<"ram_cache">> => <<"true">>}, + <<"messages">> => + #{<<"ram_cache">> => <<"false">>} + }, + desc => ?DESC(persistent_session_store_backend) } )}, {"max_retain_undelivered", @@ -245,6 +269,33 @@ fields("persistent_session_store") -> } )} ]; +fields("persistent_table_mria_opts") -> + [ + {"ram_cache", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(persistent_store_ram_cache) + } + )} + ]; +fields("persistent_session_builtin") -> + [ + {"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})}, + {"session", + sc(ref("persistent_table_mria_opts"), #{ + desc => ?DESC(persistent_session_builtin_session_table) + })}, + {"session_messages", + sc(ref("persistent_table_mria_opts"), #{ + desc => ?DESC(persistent_session_builtin_sess_msg_table) + })}, + {"messages", + sc(ref("persistent_table_mria_opts"), #{ + desc => ?DESC(persistent_session_builtin_messages_table) + })} + ]; fields("stats") -> [ {"enable", @@ -1526,6 +1577,10 @@ base_listener() -> desc("persistent_session_store") -> "Settings for message persistence."; +desc("persistent_session_builtin") -> + "Settings for the built-in storage engine of persistent messages."; +desc("persistent_table_mria_opts") -> + "Tuning options for the mria table."; desc("stats") -> "Enable/disable statistic data collection.\n" "Statistic data such as message receive/send count/rate etc. " diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 013cb8f9b..30725be6f 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -50,8 +50,7 @@ -compile(nowarn_export_all). -endif. --define(SESSION_DISC_TRIE, emqx_session_trie_disc). --define(SESSION_RAM_TRIE, emqx_session_trie_ram). +-define(SESSION_TRIE, emqx_session_trie). -define(PREFIX(Prefix), {Prefix, 0}). -define(TOPIC(Topic), {Topic, 1}). @@ -82,7 +81,12 @@ mnesia(boot) -> {storage_properties, StoreProps} ]). -create_session_trie(disc) -> +create_session_trie(Type) -> + Storage = + case Type of + disc -> disc_copies; + ram -> ram_copies + end, StoreProps = [ {ets, [ {read_concurrency, true}, @@ -90,28 +94,10 @@ create_session_trie(disc) -> ]} ], ok = mria:create_table( - ?SESSION_DISC_TRIE, + ?SESSION_TRIE, [ {rlog_shard, ?ROUTE_SHARD}, - {storage, disc_copies}, - {record_name, ?TRIE}, - {attributes, record_info(fields, ?TRIE)}, - {type, ordered_set}, - {storage_properties, StoreProps} - ] - ); -create_session_trie(ram) -> - StoreProps = [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ], - ok = mria:create_table( - ?SESSION_RAM_TRIE, - [ - {rlog_shard, ?ROUTE_SHARD}, - {storage, ram_copies}, + {storage, Storage}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, @@ -204,10 +190,7 @@ lock_session_tables() -> %%-------------------------------------------------------------------- session_trie() -> - case emqx_persistent_session:storage_type() of - disc -> ?SESSION_DISC_TRIE; - ram -> ?SESSION_RAM_TRIE - end. + ?SESSION_TRIE. make_keys(Topic) -> Words = emqx_topic:words(Topic), diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl similarity index 97% rename from apps/emqx/src/emqx_persistent_session.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session.erl index 52cbbd3a9..527206674 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -19,6 +19,7 @@ -export([ is_store_enabled/0, init_db_backend/0, + storage_backend/0, storage_type/0 ]). @@ -81,6 +82,9 @@ -type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok'). +%% EMQX configuration keys +-define(conf_storage_backend, [persistent_session_store, backend, type]). + %%-------------------------------------------------------------------- %% Init %%-------------------------------------------------------------------- @@ -91,27 +95,23 @@ init_db_backend() -> StorageType = storage_type(), ok = emqx_trie:create_session_trie(StorageType), ok = emqx_session_router:create_router_tab(StorageType), - case StorageType of - disc -> - emqx_persistent_session_mnesia_disc_backend:create_tables(), - persistent_term:put( - ?db_backend_key, emqx_persistent_session_mnesia_disc_backend - ); - ram -> - emqx_persistent_session_mnesia_ram_backend:create_tables(), - persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend) + case storage_backend() of + builtin -> + emqx_persistent_session_backend_builtin:create_tables(), + persistent_term:put(?db_backend_key, emqx_persistent_session_backend_builtin) end, ok; false -> - persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend), + persistent_term:put(?db_backend_key, emqx_persistent_session_backend_dummy), ok end. is_store_enabled() -> emqx_config:get(?is_enabled_key). -storage_type() -> - emqx_config:get(?storage_type_key). +-spec storage_backend() -> builtin. +storage_backend() -> + emqx_config:get(?conf_storage_backend). %%-------------------------------------------------------------------- %% Session message ADT API @@ -557,3 +557,10 @@ gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) - %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing. NewAbandoned = S =:= SessionID andalso Abandoned, gc_traverse(next_session_message(Key), S, NewAbandoned, Fun). + +-spec storage_type() -> ram | disc. +storage_type() -> + case emqx_config:get(?on_disc_key) of + true -> disc; + false -> ram + end. diff --git a/apps/emqx/src/emqx_persistent_session.hrl b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl similarity index 65% rename from apps/emqx/src/emqx_persistent_session.hrl rename to apps/emqx/src/persistent_session/emqx_persistent_session.hrl index 094963163..03b823401 100644 --- a/apps/emqx/src/emqx_persistent_session.hrl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl @@ -14,15 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- --define(SESSION_STORE_DISC, emqx_session_store_disc). --define(SESSION_STORE_RAM, emqx_session_store_ram). - --define(SESS_MSG_TAB_DISC, emqx_session_msg_disc). --define(SESS_MSG_TAB_RAM, emqx_session_msg_ram). - --define(MSG_TAB_DISC, emqx_persistent_msg_disc). --define(MSG_TAB_RAM, emqx_persistent_msg_ram). - -record(session_store, { client_id :: binary(), expiry_interval :: non_neg_integer(), @@ -35,9 +26,14 @@ val = [] :: [] }). --define(db_backend_key, [persistent_session_store, db_backend]). --define(is_enabled_key, [persistent_session_store, enabled]). --define(storage_type_key, [persistent_session_store, storage_type]). --define(msg_retain, [persistent_session_store, max_retain_undelivered]). +-define(cfg_root, persistent_session_store). +-define(db_backend_key, [?cfg_root, db_backend]). +-define(is_enabled_key, [?cfg_root, enabled]). +-define(msg_retain, [?cfg_root, max_retain_undelivered]). +-define(on_disc_key, [?cfg_root, on_disc]). + +-define(SESSION_STORE, emqx_session_store). +-define(SESS_MSG_TAB, emqx_session_msg). +-define(MSG_TAB, emqx_persistent_msg). -define(db_backend, (persistent_term:get(?db_backend_key))). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl new file mode 100644 index 000000000..f6dde5af4 --- /dev/null +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl @@ -0,0 +1,150 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_backend_builtin). + +-include("emqx.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include("emqx_persistent_session.hrl"). + +-export([ + create_tables/0, + first_message_id/0, + next_message_id/1, + delete_message/1, + first_session_message/0, + next_session_message/1, + delete_session_message/1, + put_session_store/1, + delete_session_store/1, + lookup_session_store/1, + put_session_message/1, + put_message/1, + get_message/1, + ro_transaction/1 +]). + +-type mria_table_type() :: ram_copies | disc_copies | rocksdb_copies. + +-define(IS_ETS(BACKEND), (BACKEND =:= ram_copies orelse BACKEND =:= disc_copies)). + +create_tables() -> + SessStoreBackend = table_type(session), + ok = mria:create_table(?SESSION_STORE, [ + {type, set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, SessStoreBackend}, + {record_name, session_store}, + {attributes, record_info(fields, session_store)}, + {storage_properties, storage_properties(?SESSION_STORE, SessStoreBackend)} + ]), + + SessMsgBackend = table_type(session_messages), + ok = mria:create_table(?SESS_MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, SessMsgBackend}, + {record_name, session_msg}, + {attributes, record_info(fields, session_msg)}, + {storage_properties, storage_properties(?SESS_MSG_TAB, SessMsgBackend)} + ]), + + MsgBackend = table_type(messages), + ok = mria:create_table(?MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, MsgBackend}, + {record_name, message}, + {attributes, record_info(fields, message)}, + {storage_properties, storage_properties(?MSG_TAB, MsgBackend)} + ]). + +first_session_message() -> + mnesia:dirty_first(?SESS_MSG_TAB). + +next_session_message(Key) -> + mnesia:dirty_next(?SESS_MSG_TAB, Key). + +first_message_id() -> + mnesia:dirty_first(?MSG_TAB). + +next_message_id(Key) -> + mnesia:dirty_next(?MSG_TAB, Key). + +delete_message(Key) -> + mria:dirty_delete(?MSG_TAB, Key). + +delete_session_message(Key) -> + mria:dirty_delete(?SESS_MSG_TAB, Key). + +put_session_store(SS) -> + mria:dirty_write(?SESSION_STORE, SS). + +delete_session_store(ClientID) -> + mria:dirty_delete(?SESSION_STORE, ClientID). + +lookup_session_store(ClientID) -> + case mnesia:dirty_read(?SESSION_STORE, ClientID) of + [] -> none; + [SS] -> {value, SS} + end. + +put_session_message(SessMsg) -> + mria:dirty_write(?SESS_MSG_TAB, SessMsg). + +put_message(Msg) -> + mria:dirty_write(?MSG_TAB, Msg). + +get_message(MsgId) -> + case mnesia:read(?MSG_TAB, MsgId) of + [] -> error({msg_not_found, MsgId}); + [Msg] -> Msg + end. + +ro_transaction(Fun) -> + {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), + Res. + +-spec storage_properties(?SESSION_STORE | ?SESS_MSG_TAB | ?MSG_TAB, mria_table_type()) -> term(). +storage_properties(?SESSION_STORE, Backend) when ?IS_ETS(Backend) -> + [{ets, [{read_concurrency, true}]}]; +storage_properties(_, Backend) when ?IS_ETS(Backend) -> + [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ]; +storage_properties(_, _) -> + []. + +-spec table_type(atom()) -> mria_table_type(). +table_type(Table) -> + DiscPersistence = emqx_config:get([?cfg_root, on_disc]), + RamCache = get_overlayed(Table, ram_cache), + case {DiscPersistence, RamCache} of + {true, true} -> + disc_copies; + {true, false} -> + rocksdb_copies; + {false, _} -> + ram_copies + end. + +-spec get_overlayed(atom(), on_disc | ram_cache) -> boolean(). +get_overlayed(Table, Suffix) -> + Default = emqx_config:get([?cfg_root, Suffix]), + emqx_config:get([?cfg_root, backend, Table, Suffix], Default). diff --git a/apps/emqx/src/emqx_persistent_session_dummy_backend.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl similarity index 97% rename from apps/emqx/src/emqx_persistent_session_dummy_backend.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl index 85b0b99b1..697c46cfe 100644 --- a/apps/emqx/src/emqx_persistent_session_dummy_backend.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_persistent_session_dummy_backend). +-module(emqx_persistent_session_backend_dummy). -include("emqx_persistent_session.hrl"). diff --git a/apps/emqx/src/emqx_persistent_session_gc.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl similarity index 100% rename from apps/emqx/src/emqx_persistent_session_gc.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl diff --git a/apps/emqx/src/emqx_persistent_session_sup.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl similarity index 100% rename from apps/emqx/src/emqx_persistent_session_sup.erl rename to apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 08ecf4632..6be38df0d 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -168,8 +168,8 @@ typecheck_apis( [ logger:error( "Incompatible RPC call: " - "type of the parameter ~p of RPC call ~s on release ~p " - "is not a subtype of the target function ~s on release ~p.~n" + "type of the parameter ~p of RPC call ~s in release ~p " + "is not a subtype of the target function ~s in release ~p.~n" "Caller type: ~s~nCallee type: ~s~n", [ Var, diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index ba91409e8..782104455 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -20,7 +20,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("../include/emqx.hrl"). --include("../src/emqx_persistent_session.hrl"). +-include("../src/persistent_session/emqx_persistent_session.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -69,6 +69,12 @@ groups() -> {group, snabbkaffe}, {group, gc_tests} ]}, + {rocks_tables, [], [ + {group, no_kill_connection_process}, + {group, kill_connection_process}, + {group, snabbkaffe}, + {group, gc_tests} + ]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {snabbkaffe, [], [ @@ -101,13 +107,12 @@ init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables - emqx_common_test_helpers:boot_modules(all), meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_config, get, fun - (?storage_type_key) -> Reply; + (?on_disc_key) -> Reply =:= disc; (?is_enabled_key) -> true; (Other) -> meck:passthrough([Other]) end), emqx_common_test_helpers:start_apps([], fun set_special_confs/1), ?assertEqual(true, emqx_persistent_session:is_store_enabled()), - ?assertEqual(Reply, emqx_persistent_session:storage_type()), Config; init_per_group(persistent_store_disabled, Config) -> %% Start Apps @@ -159,22 +164,17 @@ init_per_group(gc_tests, Config) -> end), meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), meck:expect(mnesia, dirty_first, fun - (?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts); - (?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts); - (?MSG_TAB_RAM) -> ets:first(MsgEts); - (?MSG_TAB_DISC) -> ets:first(MsgEts); + (?SESS_MSG_TAB) -> ets:first(SessionMsgEts); + (?MSG_TAB) -> ets:first(MsgEts); (X) -> meck:passthrough([X]) end), meck:expect(mnesia, dirty_next, fun - (?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X); - (?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X); - (?MSG_TAB_RAM, X) -> ets:next(MsgEts, X); - (?MSG_TAB_DISC, X) -> ets:next(MsgEts, X); + (?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X); + (?MSG_TAB, X) -> ets:next(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), meck:expect(mnesia, dirty_delete, fun - (?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X); - (?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X); + (?MSG_TAB, X) -> ets:delete(MsgEts, X); (Tab, X) -> meck:passthrough([Tab, X]) end), [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config]. @@ -193,7 +193,9 @@ end_per_group(gc_tests, Config) -> meck:unload(mnesia), ?config(store_owner, Config) ! stop, ok; -end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables -> +end_per_group(Group, _Config) when + Group =:= ram_tables; Group =:= disc_tables; Group =:= rocks_tables +-> meck:unload(emqx_config), emqx_common_test_helpers:stop_apps([]); end_per_group(persistent_store_disabled, _Config) -> From 39aa7de88f65571e50a4cfe44d27810e20f15b4b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 9 May 2022 10:18:03 +0200 Subject: [PATCH 2/6] fix(grammar): Fix Chinese translations Co-authored-by: JianBo He --- apps/emqx/i18n/emqx_schema_i18n.conf | 20 +++++++++---------- .../test/emqx_persistent_session_SUITE.erl | 8 +------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index d31530810..839e5dfeb 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -514,33 +514,33 @@ emqx_schema { en: "Database backend used to store information about the persistent sessions.\n" "- `builtin`: Use an embedded database (mria)" zh: "用于存储持久性会话信息的数据库后端\n" - "- `builtin`: 使用一个嵌入式数据库(mria)" + "- `builtin`: 使用内置的数据库(mria)" } label: { en: "Backend" - zh: "后台" + zh: "后端类型" } } persistent_store_on_disc { desc { en: "Persist session data on disc. If `false`, the data will be stored in RAM." - zh: "将会话数据保留在磁盘上。如果`false`,数据将被存储在RAM中。" + zh: "将会话数据保存在磁盘上。如果为 `false` 则存储在内存中。" } label: { en: "Persist on disc" - zh: "坚持 在光盘上" + zh: "持久化在磁盘上" } } persistent_store_ram_cache { desc { en: "Maintain a copy of the data in RAM for faster access." - zh: "在RAM中保持一份数据的副本,以便更快地访问。" + zh: "在内存中保持一份数据的副本,以便更快地访问。" } label: { en: "RAM cache" - zh: "RAM缓存" + zh: "内存缓存" } } @@ -603,18 +603,18 @@ emqx_schema { } label: { en: "Persistent session messages." - zh: "持久的会话信息。" + zh: "持久化会话消息" } } persistent_session_builtin_messages_table { desc { en: "Tuning for built-in messages table." - zh: "对内置信息表进行调校。" + zh: "优化内置消息表配置。" } label: { - en: "Persistent messages." - zh: "持久性信息。" + en: "Persistent messages" + zh: "持久化消息" } } diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 782104455..b8830dc0d 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -69,12 +69,6 @@ groups() -> {group, snabbkaffe}, {group, gc_tests} ]}, - {rocks_tables, [], [ - {group, no_kill_connection_process}, - {group, kill_connection_process}, - {group, snabbkaffe}, - {group, gc_tests} - ]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {snabbkaffe, [], [ @@ -194,7 +188,7 @@ end_per_group(gc_tests, Config) -> ?config(store_owner, Config) ! stop, ok; end_per_group(Group, _Config) when - Group =:= ram_tables; Group =:= disc_tables; Group =:= rocks_tables + Group =:= ram_tables; Group =:= disc_tables -> meck:unload(emqx_config), emqx_common_test_helpers:stop_apps([]); From a1285cd30825603e2c0686c05177a29e7a8b5201 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 9 May 2022 10:57:10 +0200 Subject: [PATCH 3/6] fix(persistent_sessions): Enhance documentation --- apps/emqx/i18n/emqx_schema_i18n.conf | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 839e5dfeb..bb1a2dc36 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -511,8 +511,8 @@ emqx_schema { persistent_session_store_backend { desc { - en: "Database backend used to store information about the persistent sessions.\n" - "- `builtin`: Use an embedded database (mria)" + en: "Database management system used to store information about persistent sessions and messages.\n" + "- `builtin`: Use the embedded database (mria)" zh: "用于存储持久性会话信息的数据库后端\n" "- `builtin`: 使用内置的数据库(mria)" } @@ -524,7 +524,9 @@ emqx_schema { persistent_store_on_disc { desc { - en: "Persist session data on disc. If `false`, the data will be stored in RAM." + en: "Save information about the persistent sessions on disc.\n" + "If this option is enabled, persistent sessions will survive full restart of the cluster.\n" + "Otherwise, all the data will be stored in RAM, and it will be lost when all the nodes in the cluster are stopped." zh: "将会话数据保存在磁盘上。如果为 `false` 则存储在内存中。" } label: { @@ -587,7 +589,7 @@ emqx_schema { persistent_session_builtin_session_table { desc { - en: "Tuning for built-in session table." + en: "Performance tuning options for built-in session table." zh: "对内置的会话表进行调整。" } label: { @@ -598,7 +600,7 @@ emqx_schema { persistent_session_builtin_sess_msg_table { desc { - en: "Tuning for built-in session messages table." + en: "Performance tuning options for built-in session messages table." zh: "对内置的会话信息表进行调整。" } label: { @@ -609,7 +611,7 @@ emqx_schema { persistent_session_builtin_messages_table { desc { - en: "Tuning for built-in messages table." + en: "Performance tuning options for built-in messages table." zh: "优化内置消息表配置。" } label: { From 03cfb67a4529beb24af38ade984449260694aa44 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 10 May 2022 11:21:08 +0200 Subject: [PATCH 4/6] docs(persistent_session): Fix Chinese translation --- apps/emqx/i18n/emqx_schema_i18n.conf | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index bb1a2dc36..31bb9230c 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -513,7 +513,7 @@ emqx_schema { desc { en: "Database management system used to store information about persistent sessions and messages.\n" "- `builtin`: Use the embedded database (mria)" - zh: "用于存储持久性会话信息的数据库后端\n" + zh: "数据库管理系统,用于存储有关持久性会话和消息的信息\n" "- `builtin`: 使用内置的数据库(mria)" } label: { @@ -527,7 +527,9 @@ emqx_schema { en: "Save information about the persistent sessions on disc.\n" "If this option is enabled, persistent sessions will survive full restart of the cluster.\n" "Otherwise, all the data will be stored in RAM, and it will be lost when all the nodes in the cluster are stopped." - zh: "将会话数据保存在磁盘上。如果为 `false` 则存储在内存中。" + zh: "在磁盘上保存关于持久化会话的信息。" + "如果启用了这个选项,持久化会话将在集群完全重启后继续存在。" + "否则,所有的数据都将存储在内存中,当集群中的所有节点都停止时,它就会丢失。" } label: { en: "Persist on disc" @@ -593,8 +595,8 @@ emqx_schema { zh: "对内置的会话表进行调整。" } label: { - en: "Persistent session." - zh: "持久会话。" + en: "Persistent session" + zh: "持久会话" } } @@ -604,7 +606,7 @@ emqx_schema { zh: "对内置的会话信息表进行调整。" } label: { - en: "Persistent session messages." + en: "Persistent session messages" zh: "持久化会话消息" } } From b06c4cb6cc00280eb36290c2f1767a9834c870d2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 10 May 2022 11:27:51 +0200 Subject: [PATCH 5/6] docs(persistent_session): Apply suggestions from code review Co-authored-by: JianBo He --- apps/emqx/i18n/emqx_schema_i18n.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 31bb9230c..e10d5ac9b 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -592,7 +592,7 @@ emqx_schema { persistent_session_builtin_session_table { desc { en: "Performance tuning options for built-in session table." - zh: "对内置的会话表进行调整。" + zh: "优化内置会话表的配置。" } label: { en: "Persistent session" @@ -603,7 +603,7 @@ emqx_schema { persistent_session_builtin_sess_msg_table { desc { en: "Performance tuning options for built-in session messages table." - zh: "对内置的会话信息表进行调整。" + zh: "优化内置的会话消息表的配置。" } label: { en: "Persistent session messages" From 8dbd080e459b2c984b628f8b6b05eebd18eb566b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 12 May 2022 14:22:55 +0200 Subject: [PATCH 6/6] docs(persistent_session): Apply suggestions from William --- apps/emqx/i18n/emqx_schema_i18n.conf | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index e10d5ac9b..6e4b9fda7 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -513,7 +513,7 @@ emqx_schema { desc { en: "Database management system used to store information about persistent sessions and messages.\n" "- `builtin`: Use the embedded database (mria)" - zh: "数据库管理系统,用于存储有关持久性会话和消息的信息\n" + zh: "用于存储持久性会话和信息的数据库管理后端\n" "- `builtin`: 使用内置的数据库(mria)" } label: { @@ -527,9 +527,9 @@ emqx_schema { en: "Save information about the persistent sessions on disc.\n" "If this option is enabled, persistent sessions will survive full restart of the cluster.\n" "Otherwise, all the data will be stored in RAM, and it will be lost when all the nodes in the cluster are stopped." - zh: "在磁盘上保存关于持久化会话的信息。" - "如果启用了这个选项,持久化会话将在集群完全重启后继续存在。" - "否则,所有的数据都将存储在内存中,当集群中的所有节点都停止时,它就会丢失。" + zh: "将持久会话数据保存在磁盘上。如果为 false 则存储在内存中。\n" + "如开启, 持久会话数据可在集群重启后恢复。\n" + "如关闭, 数据仅存储在内存中, 则在整个集群停止后丢失。" } label: { en: "Persist on disc" @@ -592,7 +592,7 @@ emqx_schema { persistent_session_builtin_session_table { desc { en: "Performance tuning options for built-in session table." - zh: "优化内置会话表的配置。" + zh: "用于内建会话表的性能调优参数" } label: { en: "Persistent session" @@ -607,14 +607,14 @@ emqx_schema { } label: { en: "Persistent session messages" - zh: "持久化会话消息" + zh: "用于内建会话管理表的性能调优参数" } } persistent_session_builtin_messages_table { desc { en: "Performance tuning options for built-in messages table." - zh: "优化内置消息表配置。" + zh: "用于内建消息表的性能调优参数" } label: { en: "Persistent messages"