Merge pull request #7897 from ieQu1/persist-sess-schema

feat(persistent_session): Make schema more flexible
This commit is contained in:
ieQu1 2022-05-13 11:53:30 +02:00 committed by GitHub
commit d277e2b644
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 326 additions and 328 deletions

View File

@ -509,22 +509,42 @@ emqx_schema {
} }
} }
persistent_session_store_storage_type { persistent_session_store_backend {
desc { desc {
en: "Store information about persistent sessions on disc or in ram.\n" en: "Database management system used to store information about persistent sessions and messages.\n"
"If ram is chosen, all information about persistent sessions remains\n" "- `builtin`: Use the embedded database (mria)"
"as long as at least one node in a cluster is alive to keep the information.\n" zh: "用于存储持久性会话和信息的数据库管理后端\n"
"If disc is chosen, the information is persisted on disc and will survive\n" "- `builtin`: 使用内置的数据库mria"
"cluster restart, at the price of more disc usage and less throughput.\n"
zh: "将有关持久会话的信息存储在磁盘或内存中。\n"
"如果选择了ram有关持久会话的所有信息将保留\n"
"只要群集中至少有一个节点处于活动状态,就可以保留信息。\n"
"如果选择了光盘,则信息将保留在光盘上,并且将继续存在\n"
"群集重新启动,代价是磁盘使用量增加,吞吐量降低。\n"
} }
label: { label: {
en: "Storage type" en: "Backend"
zh: "存储类型" zh: "后端类型"
}
}
persistent_store_on_disc {
desc {
en: "Save information about the persistent sessions on disc.\n"
"If this option is enabled, persistent sessions will survive full restart of the cluster.\n"
"Otherwise, all the data will be stored in RAM, and it will be lost when all the nodes in the cluster are stopped."
zh: "将持久会话数据保存在磁盘上。如果为 false 则存储在内存中。\n"
"如开启, 持久会话数据可在集群重启后恢复。\n"
"如关闭, 数据仅存储在内存中, 则在整个集群停止后丢失。"
}
label: {
en: "Persist on disc"
zh: "持久化在磁盘上"
}
}
persistent_store_ram_cache {
desc {
en: "Maintain a copy of the data in RAM for faster access."
zh: "在内存中保持一份数据的副本,以便更快地访问。"
}
label: {
en: "RAM cache"
zh: "内存缓存"
} }
} }
@ -569,6 +589,39 @@ emqx_schema {
} }
} }
persistent_session_builtin_session_table {
desc {
en: "Performance tuning options for built-in session table."
zh: "用于内建会话表的性能调优参数"
}
label: {
en: "Persistent session"
zh: "持久会话"
}
}
persistent_session_builtin_sess_msg_table {
desc {
en: "Performance tuning options for built-in session messages table."
zh: "优化内置的会话消息表的配置。"
}
label: {
en: "Persistent session messages"
zh: "用于内建会话管理表的性能调优参数"
}
}
persistent_session_builtin_messages_table {
desc {
en: "Performance tuning options for built-in messages table."
zh: "用于内建消息表的性能调优参数"
}
label: {
en: "Persistent messages"
zh: "持久化消息"
}
}
stats_enable { stats_enable {
desc { desc {
en: "Enable/disable statistic data collection." en: "Enable/disable statistic data collection."

View File

@ -1,121 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_mnesia_disc_backend).
-include("emqx.hrl").
-include("emqx_persistent_session.hrl").
-export([
create_tables/0,
first_message_id/0,
next_message_id/1,
delete_message/1,
first_session_message/0,
next_session_message/1,
delete_session_message/1,
put_session_store/1,
delete_session_store/1,
lookup_session_store/1,
put_session_message/1,
put_message/1,
get_message/1,
ro_transaction/1
]).
create_tables() ->
ok = mria:create_table(?SESSION_STORE_DISC, [
{type, set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
{record_name, session_store},
{attributes, record_info(fields, session_store)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]),
ok = mria:create_table(?SESS_MSG_TAB_DISC, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
{record_name, session_msg},
{attributes, record_info(fields, session_msg)},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
]}
]),
ok = mria:create_table(?MSG_TAB_DISC, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
{record_name, message},
{attributes, record_info(fields, message)},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
]}
]).
first_session_message() ->
mnesia:dirty_first(?SESS_MSG_TAB_DISC).
next_session_message(Key) ->
mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key).
first_message_id() ->
mnesia:dirty_first(?MSG_TAB_DISC).
next_message_id(Key) ->
mnesia:dirty_next(?MSG_TAB_DISC, Key).
delete_message(Key) ->
mria:dirty_delete(?MSG_TAB_DISC, Key).
delete_session_message(Key) ->
mria:dirty_delete(?SESS_MSG_TAB_DISC, Key).
put_session_store(SS) ->
mria:dirty_write(?SESSION_STORE_DISC, SS).
delete_session_store(ClientID) ->
mria:dirty_delete(?SESSION_STORE_DISC, ClientID).
lookup_session_store(ClientID) ->
case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of
[] -> none;
[SS] -> {value, SS}
end.
put_session_message(SessMsg) ->
mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg).
put_message(Msg) ->
mria:dirty_write(?MSG_TAB_DISC, Msg).
get_message(MsgId) ->
case mnesia:read(?MSG_TAB_DISC, MsgId) of
[] -> error({msg_not_found, MsgId});
[Msg] -> Msg
end.
ro_transaction(Fun) ->
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
Res.

View File

@ -1,121 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_mnesia_ram_backend).
-include("emqx.hrl").
-include("emqx_persistent_session.hrl").
-export([
create_tables/0,
first_message_id/0,
next_message_id/1,
delete_message/1,
first_session_message/0,
next_session_message/1,
delete_session_message/1,
put_session_store/1,
delete_session_store/1,
lookup_session_store/1,
put_session_message/1,
put_message/1,
get_message/1,
ro_transaction/1
]).
create_tables() ->
ok = mria:create_table(?SESSION_STORE_RAM, [
{type, set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, ram_copies},
{record_name, session_store},
{attributes, record_info(fields, session_store)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]),
ok = mria:create_table(?SESS_MSG_TAB_RAM, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, ram_copies},
{record_name, session_msg},
{attributes, record_info(fields, session_msg)},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
]}
]),
ok = mria:create_table(?MSG_TAB_RAM, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, ram_copies},
{record_name, message},
{attributes, record_info(fields, message)},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
]}
]).
first_session_message() ->
mnesia:dirty_first(?SESS_MSG_TAB_RAM).
next_session_message(Key) ->
mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key).
first_message_id() ->
mnesia:dirty_first(?MSG_TAB_RAM).
next_message_id(Key) ->
mnesia:dirty_next(?MSG_TAB_RAM, Key).
delete_message(Key) ->
mria:dirty_delete(?MSG_TAB_RAM, Key).
delete_session_message(Key) ->
mria:dirty_delete(?SESS_MSG_TAB_RAM, Key).
put_session_store(SS) ->
mria:dirty_write(?SESSION_STORE_RAM, SS).
delete_session_store(ClientID) ->
mria:dirty_delete(?SESSION_STORE_RAM, ClientID).
lookup_session_store(ClientID) ->
case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of
[] -> none;
[SS] -> {value, SS}
end.
put_session_message(SessMsg) ->
mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg).
put_message(Msg) ->
mria:dirty_write(?MSG_TAB_RAM, Msg).
get_message(MsgId) ->
case mnesia:read(?MSG_TAB_RAM, MsgId) of
[] -> error({msg_not_found, MsgId});
[Msg] -> Msg
end.
ro_transaction(Fun) ->
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
Res.

View File

@ -212,12 +212,36 @@ fields("persistent_session_store") ->
desc => ?DESC(persistent_session_store_enabled) desc => ?DESC(persistent_session_store_enabled)
} }
)}, )},
{"storage_type", {"on_disc",
sc( sc(
hoconsc:union([ram, disc]), boolean(),
#{ #{
default => disc, default => true,
desc => ?DESC(persistent_session_store_storage_type) desc => ?DESC(persistent_store_on_disc)
}
)},
{"ram_cache",
sc(
boolean(),
#{
default => false,
desc => ?DESC(persistent_store_ram_cache)
}
)},
{"backend",
sc(
hoconsc:union([ref("persistent_session_builtin")]),
#{
default => #{
<<"type">> => <<"builtin">>,
<<"session">> =>
#{<<"ram_cache">> => <<"true">>},
<<"session_messages">> =>
#{<<"ram_cache">> => <<"true">>},
<<"messages">> =>
#{<<"ram_cache">> => <<"false">>}
},
desc => ?DESC(persistent_session_store_backend)
} }
)}, )},
{"max_retain_undelivered", {"max_retain_undelivered",
@ -245,6 +269,33 @@ fields("persistent_session_store") ->
} }
)} )}
]; ];
fields("persistent_table_mria_opts") ->
[
{"ram_cache",
sc(
boolean(),
#{
default => true,
desc => ?DESC(persistent_store_ram_cache)
}
)}
];
fields("persistent_session_builtin") ->
[
{"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})},
{"session",
sc(ref("persistent_table_mria_opts"), #{
desc => ?DESC(persistent_session_builtin_session_table)
})},
{"session_messages",
sc(ref("persistent_table_mria_opts"), #{
desc => ?DESC(persistent_session_builtin_sess_msg_table)
})},
{"messages",
sc(ref("persistent_table_mria_opts"), #{
desc => ?DESC(persistent_session_builtin_messages_table)
})}
];
fields("stats") -> fields("stats") ->
[ [
{"enable", {"enable",
@ -1526,6 +1577,10 @@ base_listener() ->
desc("persistent_session_store") -> desc("persistent_session_store") ->
"Settings for message persistence."; "Settings for message persistence.";
desc("persistent_session_builtin") ->
"Settings for the built-in storage engine of persistent messages.";
desc("persistent_table_mria_opts") ->
"Tuning options for the mria table.";
desc("stats") -> desc("stats") ->
"Enable/disable statistic data collection.\n" "Enable/disable statistic data collection.\n"
"Statistic data such as message receive/send count/rate etc. " "Statistic data such as message receive/send count/rate etc. "

View File

@ -50,8 +50,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-define(SESSION_DISC_TRIE, emqx_session_trie_disc). -define(SESSION_TRIE, emqx_session_trie).
-define(SESSION_RAM_TRIE, emqx_session_trie_ram).
-define(PREFIX(Prefix), {Prefix, 0}). -define(PREFIX(Prefix), {Prefix, 0}).
-define(TOPIC(Topic), {Topic, 1}). -define(TOPIC(Topic), {Topic, 1}).
@ -82,7 +81,12 @@ mnesia(boot) ->
{storage_properties, StoreProps} {storage_properties, StoreProps}
]). ]).
create_session_trie(disc) -> create_session_trie(Type) ->
Storage =
case Type of
disc -> disc_copies;
ram -> ram_copies
end,
StoreProps = [ StoreProps = [
{ets, [ {ets, [
{read_concurrency, true}, {read_concurrency, true},
@ -90,28 +94,10 @@ create_session_trie(disc) ->
]} ]}
], ],
ok = mria:create_table( ok = mria:create_table(
?SESSION_DISC_TRIE, ?SESSION_TRIE,
[ [
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{storage, disc_copies}, {storage, Storage},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}
]
);
create_session_trie(ram) ->
StoreProps = [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
],
ok = mria:create_table(
?SESSION_RAM_TRIE,
[
{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
{record_name, ?TRIE}, {record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)}, {attributes, record_info(fields, ?TRIE)},
{type, ordered_set}, {type, ordered_set},
@ -204,10 +190,7 @@ lock_session_tables() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
session_trie() -> session_trie() ->
case emqx_persistent_session:storage_type() of ?SESSION_TRIE.
disc -> ?SESSION_DISC_TRIE;
ram -> ?SESSION_RAM_TRIE
end.
make_keys(Topic) -> make_keys(Topic) ->
Words = emqx_topic:words(Topic), Words = emqx_topic:words(Topic),

View File

@ -19,6 +19,7 @@
-export([ -export([
is_store_enabled/0, is_store_enabled/0,
init_db_backend/0, init_db_backend/0,
storage_backend/0,
storage_type/0 storage_type/0
]). ]).
@ -81,6 +82,9 @@
-type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok'). -type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok').
%% EMQX configuration keys
-define(conf_storage_backend, [persistent_session_store, backend, type]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Init %% Init
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -91,27 +95,23 @@ init_db_backend() ->
StorageType = storage_type(), StorageType = storage_type(),
ok = emqx_trie:create_session_trie(StorageType), ok = emqx_trie:create_session_trie(StorageType),
ok = emqx_session_router:create_router_tab(StorageType), ok = emqx_session_router:create_router_tab(StorageType),
case StorageType of case storage_backend() of
disc -> builtin ->
emqx_persistent_session_mnesia_disc_backend:create_tables(), emqx_persistent_session_backend_builtin:create_tables(),
persistent_term:put( persistent_term:put(?db_backend_key, emqx_persistent_session_backend_builtin)
?db_backend_key, emqx_persistent_session_mnesia_disc_backend
);
ram ->
emqx_persistent_session_mnesia_ram_backend:create_tables(),
persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend)
end, end,
ok; ok;
false -> false ->
persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend), persistent_term:put(?db_backend_key, emqx_persistent_session_backend_dummy),
ok ok
end. end.
is_store_enabled() -> is_store_enabled() ->
emqx_config:get(?is_enabled_key). emqx_config:get(?is_enabled_key).
storage_type() -> -spec storage_backend() -> builtin.
emqx_config:get(?storage_type_key). storage_backend() ->
emqx_config:get(?conf_storage_backend).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Session message ADT API %% Session message ADT API
@ -557,3 +557,10 @@ gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) -
%% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing. %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing.
NewAbandoned = S =:= SessionID andalso Abandoned, NewAbandoned = S =:= SessionID andalso Abandoned,
gc_traverse(next_session_message(Key), S, NewAbandoned, Fun). gc_traverse(next_session_message(Key), S, NewAbandoned, Fun).
-spec storage_type() -> ram | disc.
storage_type() ->
case emqx_config:get(?on_disc_key) of
true -> disc;
false -> ram
end.

View File

@ -14,15 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(SESSION_STORE_DISC, emqx_session_store_disc).
-define(SESSION_STORE_RAM, emqx_session_store_ram).
-define(SESS_MSG_TAB_DISC, emqx_session_msg_disc).
-define(SESS_MSG_TAB_RAM, emqx_session_msg_ram).
-define(MSG_TAB_DISC, emqx_persistent_msg_disc).
-define(MSG_TAB_RAM, emqx_persistent_msg_ram).
-record(session_store, { -record(session_store, {
client_id :: binary(), client_id :: binary(),
expiry_interval :: non_neg_integer(), expiry_interval :: non_neg_integer(),
@ -35,9 +26,14 @@
val = [] :: [] val = [] :: []
}). }).
-define(db_backend_key, [persistent_session_store, db_backend]). -define(cfg_root, persistent_session_store).
-define(is_enabled_key, [persistent_session_store, enabled]). -define(db_backend_key, [?cfg_root, db_backend]).
-define(storage_type_key, [persistent_session_store, storage_type]). -define(is_enabled_key, [?cfg_root, enabled]).
-define(msg_retain, [persistent_session_store, max_retain_undelivered]). -define(msg_retain, [?cfg_root, max_retain_undelivered]).
-define(on_disc_key, [?cfg_root, on_disc]).
-define(SESSION_STORE, emqx_session_store).
-define(SESS_MSG_TAB, emqx_session_msg).
-define(MSG_TAB, emqx_persistent_msg).
-define(db_backend, (persistent_term:get(?db_backend_key))). -define(db_backend, (persistent_term:get(?db_backend_key))).

View File

@ -0,0 +1,150 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_backend_builtin).
-include("emqx.hrl").
-include_lib("typerefl/include/types.hrl").
-include("emqx_persistent_session.hrl").
-export([
create_tables/0,
first_message_id/0,
next_message_id/1,
delete_message/1,
first_session_message/0,
next_session_message/1,
delete_session_message/1,
put_session_store/1,
delete_session_store/1,
lookup_session_store/1,
put_session_message/1,
put_message/1,
get_message/1,
ro_transaction/1
]).
-type mria_table_type() :: ram_copies | disc_copies | rocksdb_copies.
-define(IS_ETS(BACKEND), (BACKEND =:= ram_copies orelse BACKEND =:= disc_copies)).
create_tables() ->
SessStoreBackend = table_type(session),
ok = mria:create_table(?SESSION_STORE, [
{type, set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, SessStoreBackend},
{record_name, session_store},
{attributes, record_info(fields, session_store)},
{storage_properties, storage_properties(?SESSION_STORE, SessStoreBackend)}
]),
SessMsgBackend = table_type(session_messages),
ok = mria:create_table(?SESS_MSG_TAB, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, SessMsgBackend},
{record_name, session_msg},
{attributes, record_info(fields, session_msg)},
{storage_properties, storage_properties(?SESS_MSG_TAB, SessMsgBackend)}
]),
MsgBackend = table_type(messages),
ok = mria:create_table(?MSG_TAB, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, MsgBackend},
{record_name, message},
{attributes, record_info(fields, message)},
{storage_properties, storage_properties(?MSG_TAB, MsgBackend)}
]).
first_session_message() ->
mnesia:dirty_first(?SESS_MSG_TAB).
next_session_message(Key) ->
mnesia:dirty_next(?SESS_MSG_TAB, Key).
first_message_id() ->
mnesia:dirty_first(?MSG_TAB).
next_message_id(Key) ->
mnesia:dirty_next(?MSG_TAB, Key).
delete_message(Key) ->
mria:dirty_delete(?MSG_TAB, Key).
delete_session_message(Key) ->
mria:dirty_delete(?SESS_MSG_TAB, Key).
put_session_store(SS) ->
mria:dirty_write(?SESSION_STORE, SS).
delete_session_store(ClientID) ->
mria:dirty_delete(?SESSION_STORE, ClientID).
lookup_session_store(ClientID) ->
case mnesia:dirty_read(?SESSION_STORE, ClientID) of
[] -> none;
[SS] -> {value, SS}
end.
put_session_message(SessMsg) ->
mria:dirty_write(?SESS_MSG_TAB, SessMsg).
put_message(Msg) ->
mria:dirty_write(?MSG_TAB, Msg).
get_message(MsgId) ->
case mnesia:read(?MSG_TAB, MsgId) of
[] -> error({msg_not_found, MsgId});
[Msg] -> Msg
end.
ro_transaction(Fun) ->
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
Res.
-spec storage_properties(?SESSION_STORE | ?SESS_MSG_TAB | ?MSG_TAB, mria_table_type()) -> term().
storage_properties(?SESSION_STORE, Backend) when ?IS_ETS(Backend) ->
[{ets, [{read_concurrency, true}]}];
storage_properties(_, Backend) when ?IS_ETS(Backend) ->
[
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
];
storage_properties(_, _) ->
[].
-spec table_type(atom()) -> mria_table_type().
table_type(Table) ->
DiscPersistence = emqx_config:get([?cfg_root, on_disc]),
RamCache = get_overlayed(Table, ram_cache),
case {DiscPersistence, RamCache} of
{true, true} ->
disc_copies;
{true, false} ->
rocksdb_copies;
{false, _} ->
ram_copies
end.
-spec get_overlayed(atom(), on_disc | ram_cache) -> boolean().
get_overlayed(Table, Suffix) ->
Default = emqx_config:get([?cfg_root, Suffix]),
emqx_config:get([?cfg_root, backend, Table, Suffix], Default).

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_persistent_session_dummy_backend). -module(emqx_persistent_session_backend_dummy).
-include("emqx_persistent_session.hrl"). -include("emqx_persistent_session.hrl").

View File

@ -168,8 +168,8 @@ typecheck_apis(
[ [
logger:error( logger:error(
"Incompatible RPC call: " "Incompatible RPC call: "
"type of the parameter ~p of RPC call ~s on release ~p " "type of the parameter ~p of RPC call ~s in release ~p "
"is not a subtype of the target function ~s on release ~p.~n" "is not a subtype of the target function ~s in release ~p.~n"
"Caller type: ~s~nCallee type: ~s~n", "Caller type: ~s~nCallee type: ~s~n",
[ [
Var, Var,

View File

@ -20,7 +20,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("../include/emqx.hrl"). -include_lib("../include/emqx.hrl").
-include("../src/emqx_persistent_session.hrl"). -include("../src/persistent_session/emqx_persistent_session.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -101,13 +101,12 @@ init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables -
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_config, get, fun meck:expect(emqx_config, get, fun
(?storage_type_key) -> Reply; (?on_disc_key) -> Reply =:= disc;
(?is_enabled_key) -> true; (?is_enabled_key) -> true;
(Other) -> meck:passthrough([Other]) (Other) -> meck:passthrough([Other])
end), end),
emqx_common_test_helpers:start_apps([], fun set_special_confs/1), emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
?assertEqual(true, emqx_persistent_session:is_store_enabled()), ?assertEqual(true, emqx_persistent_session:is_store_enabled()),
?assertEqual(Reply, emqx_persistent_session:storage_type()),
Config; Config;
init_per_group(persistent_store_disabled, Config) -> init_per_group(persistent_store_disabled, Config) ->
%% Start Apps %% Start Apps
@ -159,22 +158,17 @@ init_per_group(gc_tests, Config) ->
end), end),
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
meck:expect(mnesia, dirty_first, fun meck:expect(mnesia, dirty_first, fun
(?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts); (?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
(?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts); (?MSG_TAB) -> ets:first(MsgEts);
(?MSG_TAB_RAM) -> ets:first(MsgEts);
(?MSG_TAB_DISC) -> ets:first(MsgEts);
(X) -> meck:passthrough([X]) (X) -> meck:passthrough([X])
end), end),
meck:expect(mnesia, dirty_next, fun meck:expect(mnesia, dirty_next, fun
(?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X); (?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
(?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X); (?MSG_TAB, X) -> ets:next(MsgEts, X);
(?MSG_TAB_RAM, X) -> ets:next(MsgEts, X);
(?MSG_TAB_DISC, X) -> ets:next(MsgEts, X);
(Tab, X) -> meck:passthrough([Tab, X]) (Tab, X) -> meck:passthrough([Tab, X])
end), end),
meck:expect(mnesia, dirty_delete, fun meck:expect(mnesia, dirty_delete, fun
(?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X); (?MSG_TAB, X) -> ets:delete(MsgEts, X);
(?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X);
(Tab, X) -> meck:passthrough([Tab, X]) (Tab, X) -> meck:passthrough([Tab, X])
end), end),
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config]. [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
@ -193,7 +187,9 @@ end_per_group(gc_tests, Config) ->
meck:unload(mnesia), meck:unload(mnesia),
?config(store_owner, Config) ! stop, ?config(store_owner, Config) ! stop,
ok; ok;
end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables -> end_per_group(Group, _Config) when
Group =:= ram_tables; Group =:= disc_tables
->
meck:unload(emqx_config), meck:unload(emqx_config),
emqx_common_test_helpers:stop_apps([]); emqx_common_test_helpers:stop_apps([]);
end_per_group(persistent_store_disabled, _Config) -> end_per_group(persistent_store_disabled, _Config) ->