feat(persistent_sessions): make table type in mnesia backend configurable
This commit is contained in:
parent
716dcd0086
commit
ab3526260c
|
@ -1646,6 +1646,12 @@ persistent_session_store {
|
||||||
## ValueType: Boolean
|
## ValueType: Boolean
|
||||||
## Default: false
|
## Default: false
|
||||||
enabled = false
|
enabled = false
|
||||||
|
## Which database backend should be used
|
||||||
|
##
|
||||||
|
## @doc persistent_session_store.db_backend
|
||||||
|
## ValueType: mnesia_ram | mnesia_disc
|
||||||
|
## Default: mnesia_ram
|
||||||
|
db_backend = mnesia_ram
|
||||||
|
|
||||||
## How long are undelivered messages retained in the store
|
## How long are undelivered messages retained in the store
|
||||||
##
|
##
|
||||||
|
|
|
@ -82,12 +82,17 @@
|
||||||
init_db_backend() ->
|
init_db_backend() ->
|
||||||
case is_store_enabled() of
|
case is_store_enabled() of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_trie:create_session_trie(),
|
TableType =
|
||||||
emqx_persistent_session_mnesia_backend:create_tables(),
|
case emqx_config:get(?db_backend_key) of
|
||||||
persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend),
|
mnesia_ram -> ram_copies;
|
||||||
|
mnesia_disc -> disc_copies
|
||||||
|
end,
|
||||||
|
ok = emqx_trie:create_session_trie(TableType),
|
||||||
|
emqx_persistent_session_mnesia_backend:create_tables(TableType),
|
||||||
|
persistent_term:put(?db_backend_module, emqx_persistent_session_mnesia_backend),
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend),
|
persistent_term:put(?db_backend_module, emqx_persistent_session_dummy_backend),
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -30,4 +30,5 @@
|
||||||
-define(is_enabled_key, [persistent_session_store, enabled]).
|
-define(is_enabled_key, [persistent_session_store, enabled]).
|
||||||
-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
|
-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
|
||||||
|
|
||||||
-define(db_backend, (persistent_term:get(?db_backend_key))).
|
-define(db_backend_module, [persistent_session_store, db_backend_module]).
|
||||||
|
-define(db_backend, (persistent_term:get(?db_backend_module))).
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_persistent_session.hrl").
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
-export([ create_tables/0
|
-export([ create_tables/1
|
||||||
, first_message_id/0
|
, first_message_id/0
|
||||||
, next_message_id/1
|
, next_message_id/1
|
||||||
, delete_message/1
|
, delete_message/1
|
||||||
|
@ -35,11 +35,12 @@
|
||||||
, ro_transaction/1
|
, ro_transaction/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
create_tables() ->
|
create_tables(TableType) when TableType =:= ram_copies;
|
||||||
|
TableType =:= disc_copies ->
|
||||||
ok = mria:create_table(?SESSION_STORE, [
|
ok = mria:create_table(?SESSION_STORE, [
|
||||||
{type, set},
|
{type, set},
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
{storage, disc_copies},
|
{storage, TableType},
|
||||||
{record_name, session_store},
|
{record_name, session_store},
|
||||||
{attributes, record_info(fields, session_store)},
|
{attributes, record_info(fields, session_store)},
|
||||||
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
||||||
|
@ -47,7 +48,7 @@ create_tables() ->
|
||||||
ok = mria:create_table(?SESS_MSG_TAB, [
|
ok = mria:create_table(?SESS_MSG_TAB, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
{storage, disc_copies},
|
{storage, TableType},
|
||||||
{record_name, session_msg},
|
{record_name, session_msg},
|
||||||
{attributes, record_info(fields, session_msg)},
|
{attributes, record_info(fields, session_msg)},
|
||||||
{storage_properties, [{ets, [{read_concurrency, true},
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
@ -56,7 +57,7 @@ create_tables() ->
|
||||||
ok = mria:create_table(?MSG_TAB, [
|
ok = mria:create_table(?MSG_TAB, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
{storage, disc_copies},
|
{storage, TableType},
|
||||||
{record_name, message},
|
{record_name, message},
|
||||||
{attributes, record_info(fields, message)},
|
{attributes, record_info(fields, message)},
|
||||||
{storage_properties, [{ets, [{read_concurrency, true},
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
|
|
@ -160,6 +160,10 @@ fields("persistent_session_store") ->
|
||||||
sc(boolean(),
|
sc(boolean(),
|
||||||
#{ default => "false"
|
#{ default => "false"
|
||||||
})},
|
})},
|
||||||
|
{"db_backend",
|
||||||
|
sc(hoconsc:union([mnesia_ram, mnesia_disc]),
|
||||||
|
#{ default => "mnesia_ram"
|
||||||
|
})},
|
||||||
{"max_retain_undelivered",
|
{"max_retain_undelivered",
|
||||||
sc(duration(),
|
sc(duration(),
|
||||||
#{ default => "1h"
|
#{ default => "1h"
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([ mnesia/1
|
-export([ mnesia/1
|
||||||
, create_session_trie/0
|
, create_session_trie/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
@ -75,13 +75,14 @@ mnesia(boot) ->
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{storage_properties, StoreProps}]).
|
{storage_properties, StoreProps}]).
|
||||||
|
|
||||||
create_session_trie() ->
|
create_session_trie(TableType) when TableType =:= ram_copies;
|
||||||
|
TableType =:= disc_copies ->
|
||||||
StoreProps = [{ets, [{read_concurrency, true},
|
StoreProps = [{ets, [{read_concurrency, true},
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]}],
|
]}],
|
||||||
ok = mria:create_table(?SESSION_TRIE,
|
ok = mria:create_table(?SESSION_TRIE,
|
||||||
[{rlog_shard, ?ROUTE_SHARD},
|
[{rlog_shard, ?ROUTE_SHARD},
|
||||||
{storage, disc_copies},
|
{storage, TableType},
|
||||||
{record_name, ?TRIE},
|
{record_name, ?TRIE},
|
||||||
{attributes, record_info(fields, ?TRIE)},
|
{attributes, record_info(fields, ?TRIE)},
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
|
|
Loading…
Reference in New Issue