fix(persistent_sessions): Add both disc and ram tables to avoid schema confusion
This commit is contained in:
parent
ab3526260c
commit
b4d99bc176
|
@ -82,14 +82,14 @@
|
||||||
init_db_backend() ->
|
init_db_backend() ->
|
||||||
case is_store_enabled() of
|
case is_store_enabled() of
|
||||||
true ->
|
true ->
|
||||||
TableType =
|
Backend =
|
||||||
case emqx_config:get(?db_backend_key) of
|
case emqx_config:get(?db_backend_key) of
|
||||||
mnesia_ram -> ram_copies;
|
mnesia_ram -> emqx_persistent_session_mnesia_ram_backend;
|
||||||
mnesia_disc -> disc_copies
|
mnesia_disc -> emqx_persistent_session_mnesia_disc_backend
|
||||||
end,
|
end,
|
||||||
ok = emqx_trie:create_session_trie(TableType),
|
persistent_term:put(?db_backend_module, Backend),
|
||||||
emqx_persistent_session_mnesia_backend:create_tables(TableType),
|
Backend:create_tables(),
|
||||||
persistent_term:put(?db_backend_module, emqx_persistent_session_mnesia_backend),
|
ok = emqx_trie:create_session_trie(),
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
persistent_term:put(?db_backend_module, emqx_persistent_session_dummy_backend),
|
persistent_term:put(?db_backend_module, emqx_persistent_session_dummy_backend),
|
||||||
|
|
|
@ -14,10 +14,6 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(SESSION_STORE, emqx_session_store).
|
|
||||||
-define(SESS_MSG_TAB, emqx_session_msg).
|
|
||||||
-define(MSG_TAB, emqx_persistent_msg).
|
|
||||||
|
|
||||||
-record(session_store, { client_id :: binary()
|
-record(session_store, { client_id :: binary()
|
||||||
, expiry_interval :: non_neg_integer()
|
, expiry_interval :: non_neg_integer()
|
||||||
, ts :: non_neg_integer()
|
, ts :: non_neg_integer()
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_mnesia_disc_backend).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
|
-define(SESSION_STORE, emqx_session_store_disc).
|
||||||
|
-define(SESS_MSG_TAB, emqx_session_msg_disc).
|
||||||
|
-define(MSG_TAB, emqx_persistent_msg_disc).
|
||||||
|
|
||||||
|
-export([ create_tables/0
|
||||||
|
, first_message_id/0
|
||||||
|
, next_message_id/1
|
||||||
|
, delete_message/1
|
||||||
|
, first_session_message/0
|
||||||
|
, next_session_message/1
|
||||||
|
, delete_session_message/1
|
||||||
|
, put_session_store/1
|
||||||
|
, delete_session_store/1
|
||||||
|
, lookup_session_store/1
|
||||||
|
, put_session_message/1
|
||||||
|
, put_message/1
|
||||||
|
, get_message/1
|
||||||
|
, ro_transaction/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
create_tables() ->
|
||||||
|
ok = mria:create_table(?SESSION_STORE, [
|
||||||
|
{type, set},
|
||||||
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
|
{storage, disc_copies},
|
||||||
|
{record_name, session_store},
|
||||||
|
{attributes, record_info(fields, session_store)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
||||||
|
|
||||||
|
ok = mria:create_table(?SESS_MSG_TAB, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
|
{storage, disc_copies},
|
||||||
|
{record_name, session_msg},
|
||||||
|
{attributes, record_info(fields, session_msg)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]),
|
||||||
|
|
||||||
|
ok = mria:create_table(?MSG_TAB, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
|
{storage, disc_copies},
|
||||||
|
{record_name, message},
|
||||||
|
{attributes, record_info(fields, message)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]).
|
||||||
|
|
||||||
|
first_session_message() ->
|
||||||
|
mnesia:dirty_first(?SESS_MSG_TAB).
|
||||||
|
|
||||||
|
next_session_message(Key) ->
|
||||||
|
mnesia:dirty_next(?SESS_MSG_TAB, Key).
|
||||||
|
|
||||||
|
first_message_id() ->
|
||||||
|
mnesia:dirty_first(?MSG_TAB).
|
||||||
|
|
||||||
|
next_message_id(Key) ->
|
||||||
|
mnesia:dirty_next(?MSG_TAB, Key).
|
||||||
|
|
||||||
|
delete_message(Key) ->
|
||||||
|
mria:dirty_delete(?MSG_TAB, Key).
|
||||||
|
|
||||||
|
delete_session_message(Key) ->
|
||||||
|
mria:dirty_delete(?SESS_MSG_TAB, Key).
|
||||||
|
|
||||||
|
put_session_store(SS) ->
|
||||||
|
mria:dirty_write(?SESSION_STORE, SS).
|
||||||
|
|
||||||
|
delete_session_store(ClientID) ->
|
||||||
|
mria:dirty_delete(?SESSION_STORE, ClientID).
|
||||||
|
|
||||||
|
lookup_session_store(ClientID) ->
|
||||||
|
case mnesia:dirty_read(?SESSION_STORE, ClientID) of
|
||||||
|
[] -> none;
|
||||||
|
[SS] -> {value, SS}
|
||||||
|
end.
|
||||||
|
|
||||||
|
put_session_message(SessMsg) ->
|
||||||
|
mria:dirty_write(?SESS_MSG_TAB, SessMsg).
|
||||||
|
|
||||||
|
put_message(Msg) ->
|
||||||
|
mria:dirty_write(?MSG_TAB, Msg).
|
||||||
|
|
||||||
|
get_message(MsgId) ->
|
||||||
|
case mnesia:read(?MSG_TAB, MsgId) of
|
||||||
|
[] -> error({msg_not_found, MsgId});
|
||||||
|
[Msg] -> Msg
|
||||||
|
end.
|
||||||
|
|
||||||
|
ro_transaction(Fun) ->
|
||||||
|
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
|
||||||
|
Res.
|
||||||
|
|
|
@ -14,12 +14,16 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_persistent_session_mnesia_backend).
|
-module(emqx_persistent_session_mnesia_ram_backend).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_persistent_session.hrl").
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
-export([ create_tables/1
|
-define(SESSION_STORE, emqx_session_store_ram).
|
||||||
|
-define(SESS_MSG_TAB, emqx_session_msg_ram).
|
||||||
|
-define(MSG_TAB, emqx_persistent_msg_ram).
|
||||||
|
|
||||||
|
-export([ create_tables/0
|
||||||
, first_message_id/0
|
, first_message_id/0
|
||||||
, next_message_id/1
|
, next_message_id/1
|
||||||
, delete_message/1
|
, delete_message/1
|
||||||
|
@ -35,12 +39,11 @@
|
||||||
, ro_transaction/1
|
, ro_transaction/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
create_tables(TableType) when TableType =:= ram_copies;
|
create_tables() ->
|
||||||
TableType =:= disc_copies ->
|
|
||||||
ok = mria:create_table(?SESSION_STORE, [
|
ok = mria:create_table(?SESSION_STORE, [
|
||||||
{type, set},
|
{type, set},
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
{storage, TableType},
|
{storage, ram_copies},
|
||||||
{record_name, session_store},
|
{record_name, session_store},
|
||||||
{attributes, record_info(fields, session_store)},
|
{attributes, record_info(fields, session_store)},
|
||||||
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
||||||
|
@ -48,7 +51,7 @@ create_tables(TableType) when TableType =:= ram_copies;
|
||||||
ok = mria:create_table(?SESS_MSG_TAB, [
|
ok = mria:create_table(?SESS_MSG_TAB, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
{storage, TableType},
|
{storage, ram_copies},
|
||||||
{record_name, session_msg},
|
{record_name, session_msg},
|
||||||
{attributes, record_info(fields, session_msg)},
|
{attributes, record_info(fields, session_msg)},
|
||||||
{storage_properties, [{ets, [{read_concurrency, true},
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
@ -57,7 +60,7 @@ create_tables(TableType) when TableType =:= ram_copies;
|
||||||
ok = mria:create_table(?MSG_TAB, [
|
ok = mria:create_table(?MSG_TAB, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
{storage, TableType},
|
{storage, ram_copies},
|
||||||
{record_name, message},
|
{record_name, message},
|
||||||
{attributes, record_info(fields, message)},
|
{attributes, record_info(fields, message)},
|
||||||
{storage_properties, [{ets, [{read_concurrency, true},
|
{storage_properties, [{ets, [{read_concurrency, true},
|
|
@ -17,10 +17,11 @@
|
||||||
-module(emqx_trie).
|
-module(emqx_trie).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([ mnesia/1
|
-export([ mnesia/1
|
||||||
, create_session_trie/1
|
, create_session_trie/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
@ -48,7 +49,8 @@
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(TRIE, emqx_trie).
|
-define(TRIE, emqx_trie).
|
||||||
-define(SESSION_TRIE, emqx_session_trie).
|
-define(SESSION_TRIE_RAM, emqx_session_trie_ram).
|
||||||
|
-define(SESSION_TRIE_DISC, emqx_session_trie_disc).
|
||||||
-define(PREFIX(Prefix), {Prefix, 0}).
|
-define(PREFIX(Prefix), {Prefix, 0}).
|
||||||
-define(TOPIC(Topic), {Topic, 1}).
|
-define(TOPIC(Topic), {Topic, 1}).
|
||||||
|
|
||||||
|
@ -57,6 +59,12 @@
|
||||||
, count = 0 :: non_neg_integer()
|
, count = 0 :: non_neg_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(ACTIVE_SESSION_TRIE, case emqx_config:get(?db_backend_key) of
|
||||||
|
mnesia_ram -> ?SESSION_TRIE_RAM;
|
||||||
|
mnesia_disc -> ?SESSION_TRIE_DISC
|
||||||
|
end).
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -75,19 +83,26 @@ mnesia(boot) ->
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{storage_properties, StoreProps}]).
|
{storage_properties, StoreProps}]).
|
||||||
|
|
||||||
create_session_trie(TableType) when TableType =:= ram_copies;
|
create_session_trie() ->
|
||||||
TableType =:= disc_copies ->
|
|
||||||
StoreProps = [{ets, [{read_concurrency, true},
|
StoreProps = [{ets, [{read_concurrency, true},
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]}],
|
]}],
|
||||||
ok = mria:create_table(?SESSION_TRIE,
|
ok = mria:create_table(?SESSION_TRIE_RAM,
|
||||||
[{rlog_shard, ?ROUTE_SHARD},
|
[{rlog_shard, ?ROUTE_SHARD},
|
||||||
{storage, TableType},
|
{storage, ram_copies},
|
||||||
|
{record_name, ?TRIE},
|
||||||
|
{attributes, record_info(fields, ?TRIE)},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage_properties, StoreProps}]),
|
||||||
|
ok = mria:create_table(?SESSION_TRIE_DISC,
|
||||||
|
[{rlog_shard, ?ROUTE_SHARD},
|
||||||
|
{storage, disc_copies},
|
||||||
{record_name, ?TRIE},
|
{record_name, ?TRIE},
|
||||||
{attributes, record_info(fields, ?TRIE)},
|
{attributes, record_info(fields, ?TRIE)},
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{storage_properties, StoreProps}]).
|
{storage_properties, StoreProps}]).
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics APIs
|
%% Topics APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -99,7 +114,7 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
|
|
||||||
-spec(insert_session(emqx_topic:topic()) -> ok).
|
-spec(insert_session(emqx_topic:topic()) -> ok).
|
||||||
insert_session(Topic) when is_binary(Topic) ->
|
insert_session(Topic) when is_binary(Topic) ->
|
||||||
insert(Topic, ?SESSION_TRIE).
|
insert(Topic, ?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
insert(Topic, Trie) when is_binary(Topic) ->
|
insert(Topic, Trie) when is_binary(Topic) ->
|
||||||
{TopicKey, PrefixKeys} = make_keys(Topic),
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
||||||
|
@ -116,7 +131,7 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
%% @doc Delete a topic filter from the trie.
|
%% @doc Delete a topic filter from the trie.
|
||||||
-spec(delete_session(emqx_topic:topic()) -> ok).
|
-spec(delete_session(emqx_topic:topic()) -> ok).
|
||||||
delete_session(Topic) when is_binary(Topic) ->
|
delete_session(Topic) when is_binary(Topic) ->
|
||||||
delete(Topic, ?SESSION_TRIE).
|
delete(Topic, ?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
delete(Topic, Trie) when is_binary(Topic) ->
|
delete(Topic, Trie) when is_binary(Topic) ->
|
||||||
{TopicKey, PrefixKeys} = make_keys(Topic),
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
||||||
|
@ -132,7 +147,7 @@ match(Topic) when is_binary(Topic) ->
|
||||||
|
|
||||||
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
||||||
match_session(Topic) when is_binary(Topic) ->
|
match_session(Topic) when is_binary(Topic) ->
|
||||||
match(Topic, ?SESSION_TRIE).
|
match(Topic, ?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
match(Topic, Trie) when is_binary(Topic) ->
|
match(Topic, Trie) when is_binary(Topic) ->
|
||||||
Words = emqx_topic:words(Topic),
|
Words = emqx_topic:words(Topic),
|
||||||
|
@ -155,7 +170,7 @@ match(Topic, Trie) when is_binary(Topic) ->
|
||||||
empty() -> empty(?TRIE).
|
empty() -> empty(?TRIE).
|
||||||
|
|
||||||
empty_session() ->
|
empty_session() ->
|
||||||
empty(?SESSION_TRIE).
|
empty(?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
|
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
|
||||||
|
|
||||||
|
@ -165,7 +180,7 @@ lock_tables() ->
|
||||||
|
|
||||||
-spec lock_session_tables() -> ok.
|
-spec lock_session_tables() -> ok.
|
||||||
lock_session_tables() ->
|
lock_session_tables() ->
|
||||||
mnesia:write_lock_table(?SESSION_TRIE).
|
mnesia:write_lock_table(?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
|
|
@ -124,15 +124,15 @@ init_per_group(gc_tests, Config) ->
|
||||||
receive stop -> ok end
|
receive stop -> ok end
|
||||||
end),
|
end),
|
||||||
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
|
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
|
meck:expect(mnesia, dirty_first, fun(emqx_session_msg_ram) -> ets:first(SessionMsgEts);
|
||||||
(?MSG_TAB) -> ets:first(MsgEts);
|
(emqx_persistent_msg_ram) -> ets:first(MsgEts);
|
||||||
(X) -> meck:passthrough(X)
|
(X) -> meck:passthrough(X)
|
||||||
end),
|
end),
|
||||||
meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
|
meck:expect(mnesia, dirty_next, fun(emqx_session_msg_ram, X) -> ets:next(SessionMsgEts, X);
|
||||||
(?MSG_TAB, X) -> ets:next(MsgEts, X);
|
(emqx_persistent_msg_ram, X) -> ets:next(MsgEts, X);
|
||||||
(Tab, X) -> meck:passthrough([Tab, X])
|
(Tab, X) -> meck:passthrough([Tab, X])
|
||||||
end),
|
end),
|
||||||
meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X);
|
meck:expect(mnesia, dirty_delete, fun(emqx_persistent_msg_ram, X) -> ets:delete(MsgEts, X);
|
||||||
(Tab, X) -> meck:passthrough([Tab, X])
|
(Tab, X) -> meck:passthrough([Tab, X])
|
||||||
end),
|
end),
|
||||||
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
|
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
|
||||||
|
|
Loading…
Reference in New Issue