Compare commits
8 Commits
master
...
extend-per
Author | SHA1 | Date |
---|---|---|
![]() |
5643eba166 | |
![]() |
03309ccd7c | |
![]() |
bb040ed95e | |
![]() |
b4d99bc176 | |
![]() |
ab3526260c | |
![]() |
716dcd0086 | |
![]() |
9481a77a01 | |
![]() |
d2bc5e77a7 |
|
@ -1646,6 +1646,12 @@ persistent_session_store {
|
||||||
## ValueType: Boolean
|
## ValueType: Boolean
|
||||||
## Default: false
|
## Default: false
|
||||||
enabled = false
|
enabled = false
|
||||||
|
## Which database backend should be used
|
||||||
|
##
|
||||||
|
## @doc persistent_session_store.db_backend
|
||||||
|
## ValueType: mnesia_ram | mnesia_disc
|
||||||
|
## Default: mnesia_ram
|
||||||
|
db_backend = mnesia_ram
|
||||||
|
|
||||||
## How long are undelivered messages retained in the store
|
## How long are undelivered messages retained in the store
|
||||||
##
|
##
|
||||||
|
|
|
@ -116,3 +116,7 @@
|
||||||
|
|
||||||
## patches dir
|
## patches dir
|
||||||
-pa {{ platform_data_dir }}/patches
|
-pa {{ platform_data_dir }}/patches
|
||||||
|
|
||||||
|
## Mnesia thresholds
|
||||||
|
-mnesia dump_log_write_threshold 5000
|
||||||
|
-mnesia dump_log_time_threshold 60000
|
||||||
|
|
|
@ -114,3 +114,7 @@
|
||||||
|
|
||||||
## patches dir
|
## patches dir
|
||||||
-pa {{ platform_data_dir }}/patches
|
-pa {{ platform_data_dir }}/patches
|
||||||
|
|
||||||
|
## Mnesia thresholds
|
||||||
|
-mnesia dump_log_write_threshold 5000
|
||||||
|
-mnesia dump_log_time_threshold 60000
|
||||||
|
|
|
@ -1179,20 +1179,27 @@ terminate(_, #channel{conn_state = idle}) -> ok;
|
||||||
terminate(normal, Channel) ->
|
terminate(normal, Channel) ->
|
||||||
run_terminate_hook(normal, Channel);
|
run_terminate_hook(normal, Channel);
|
||||||
terminate({shutdown, kicked}, Channel) ->
|
terminate({shutdown, kicked}, Channel) ->
|
||||||
_ = emqx_persistent_session:persist(Channel#channel.clientinfo,
|
persist_if_session(Channel),
|
||||||
Channel#channel.conninfo,
|
|
||||||
Channel#channel.session),
|
|
||||||
run_terminate_hook(kicked, Channel);
|
run_terminate_hook(kicked, Channel);
|
||||||
terminate({shutdown, Reason}, Channel) when Reason =:= discarded;
|
terminate({shutdown, Reason}, Channel) when Reason =:= discarded;
|
||||||
Reason =:= takeovered ->
|
Reason =:= takeovered ->
|
||||||
run_terminate_hook(Reason, Channel);
|
run_terminate_hook(Reason, Channel);
|
||||||
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
||||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||||
_ = emqx_persistent_session:persist(Channel#channel.clientinfo,
|
persist_if_session(Channel),
|
||||||
Channel#channel.conninfo,
|
|
||||||
Channel#channel.session),
|
|
||||||
run_terminate_hook(Reason, Channel).
|
run_terminate_hook(Reason, Channel).
|
||||||
|
|
||||||
|
persist_if_session(#channel{session = Session} = Channel) ->
|
||||||
|
case emqx_session:is_session(Session) of
|
||||||
|
true ->
|
||||||
|
_ = emqx_persistent_session:persist(Channel#channel.clientinfo,
|
||||||
|
Channel#channel.conninfo,
|
||||||
|
Channel#channel.session),
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
||||||
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||||
emqx_session:terminate(ClientInfo, Reason, Session).
|
emqx_session:terminate(ClientInfo, Reason, Session).
|
||||||
|
|
|
@ -58,7 +58,9 @@
|
||||||
, lookup_channels/2
|
, lookup_channels/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([all_channels/0]).
|
-export([ all_channels/0
|
||||||
|
, all_client_ids/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
|
@ -400,6 +402,11 @@ all_channels() ->
|
||||||
Pat = [{{'_', '$1'}, [], ['$1']}],
|
Pat = [{{'_', '$1'}, [], ['$1']}],
|
||||||
ets:select(?CHAN_TAB, Pat).
|
ets:select(?CHAN_TAB, Pat).
|
||||||
|
|
||||||
|
all_client_ids() ->
|
||||||
|
Pat = [{{'$1', '_'}, [], ['$1']}],
|
||||||
|
ets:select(?CHAN_TAB, Pat).
|
||||||
|
|
||||||
|
|
||||||
%% @doc Lookup channels.
|
%% @doc Lookup channels.
|
||||||
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
|
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
|
||||||
lookup_channels(ClientId) ->
|
lookup_channels(ClientId) ->
|
||||||
|
|
|
@ -82,12 +82,17 @@
|
||||||
init_db_backend() ->
|
init_db_backend() ->
|
||||||
case is_store_enabled() of
|
case is_store_enabled() of
|
||||||
true ->
|
true ->
|
||||||
|
Backend =
|
||||||
|
case emqx_config:get(?db_backend_key) of
|
||||||
|
mnesia_ram -> emqx_persistent_session_mnesia_ram_backend;
|
||||||
|
mnesia_disc -> emqx_persistent_session_mnesia_disc_backend
|
||||||
|
end,
|
||||||
|
persistent_term:put(?db_backend_module, Backend),
|
||||||
|
Backend:create_tables(),
|
||||||
ok = emqx_trie:create_session_trie(),
|
ok = emqx_trie:create_session_trie(),
|
||||||
emqx_persistent_session_mnesia_backend:create_tables(),
|
|
||||||
persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend),
|
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend),
|
persistent_term:put(?db_backend_module, emqx_persistent_session_dummy_backend),
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -14,10 +14,6 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(SESSION_STORE, emqx_session_store).
|
|
||||||
-define(SESS_MSG_TAB, emqx_session_msg).
|
|
||||||
-define(MSG_TAB, emqx_persistent_msg).
|
|
||||||
|
|
||||||
-record(session_store, { client_id :: binary()
|
-record(session_store, { client_id :: binary()
|
||||||
, expiry_interval :: non_neg_integer()
|
, expiry_interval :: non_neg_integer()
|
||||||
, ts :: non_neg_integer()
|
, ts :: non_neg_integer()
|
||||||
|
@ -30,4 +26,5 @@
|
||||||
-define(is_enabled_key, [persistent_session_store, enabled]).
|
-define(is_enabled_key, [persistent_session_store, enabled]).
|
||||||
-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
|
-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
|
||||||
|
|
||||||
-define(db_backend, (persistent_term:get(?db_backend_key))).
|
-define(db_backend_module, [persistent_session_store, db_backend_module]).
|
||||||
|
-define(db_backend, (persistent_term:get(?db_backend_module))).
|
||||||
|
|
|
@ -14,11 +14,15 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_persistent_session_mnesia_backend).
|
-module(emqx_persistent_session_mnesia_disc_backend).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_persistent_session.hrl").
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
|
-define(SESSION_STORE, emqx_session_store_disc).
|
||||||
|
-define(SESS_MSG_TAB, emqx_session_msg_disc).
|
||||||
|
-define(MSG_TAB, emqx_persistent_msg_disc).
|
||||||
|
|
||||||
-export([ create_tables/0
|
-export([ create_tables/0
|
||||||
, first_message_id/0
|
, first_message_id/0
|
||||||
, next_message_id/1
|
, next_message_id/1
|
|
@ -0,0 +1,114 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_mnesia_ram_backend).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
|
-define(SESSION_STORE, emqx_session_store_ram).
|
||||||
|
-define(SESS_MSG_TAB, emqx_session_msg_ram).
|
||||||
|
-define(MSG_TAB, emqx_persistent_msg_ram).
|
||||||
|
|
||||||
|
-export([ create_tables/0
|
||||||
|
, first_message_id/0
|
||||||
|
, next_message_id/1
|
||||||
|
, delete_message/1
|
||||||
|
, first_session_message/0
|
||||||
|
, next_session_message/1
|
||||||
|
, delete_session_message/1
|
||||||
|
, put_session_store/1
|
||||||
|
, delete_session_store/1
|
||||||
|
, lookup_session_store/1
|
||||||
|
, put_session_message/1
|
||||||
|
, put_message/1
|
||||||
|
, get_message/1
|
||||||
|
, ro_transaction/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
create_tables() ->
|
||||||
|
ok = mria:create_table(?SESSION_STORE, [
|
||||||
|
{type, set},
|
||||||
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
|
{storage, ram_copies},
|
||||||
|
{record_name, session_store},
|
||||||
|
{attributes, record_info(fields, session_store)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
||||||
|
|
||||||
|
ok = mria:create_table(?SESS_MSG_TAB, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
|
{storage, ram_copies},
|
||||||
|
{record_name, session_msg},
|
||||||
|
{attributes, record_info(fields, session_msg)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]),
|
||||||
|
|
||||||
|
ok = mria:create_table(?MSG_TAB, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
||||||
|
{storage, ram_copies},
|
||||||
|
{record_name, message},
|
||||||
|
{attributes, record_info(fields, message)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]).
|
||||||
|
|
||||||
|
first_session_message() ->
|
||||||
|
mnesia:dirty_first(?SESS_MSG_TAB).
|
||||||
|
|
||||||
|
next_session_message(Key) ->
|
||||||
|
mnesia:dirty_next(?SESS_MSG_TAB, Key).
|
||||||
|
|
||||||
|
first_message_id() ->
|
||||||
|
mnesia:dirty_first(?MSG_TAB).
|
||||||
|
|
||||||
|
next_message_id(Key) ->
|
||||||
|
mnesia:dirty_next(?MSG_TAB, Key).
|
||||||
|
|
||||||
|
delete_message(Key) ->
|
||||||
|
mria:dirty_delete(?MSG_TAB, Key).
|
||||||
|
|
||||||
|
delete_session_message(Key) ->
|
||||||
|
mria:dirty_delete(?SESS_MSG_TAB, Key).
|
||||||
|
|
||||||
|
put_session_store(SS) ->
|
||||||
|
mria:dirty_write(?SESSION_STORE, SS).
|
||||||
|
|
||||||
|
delete_session_store(ClientID) ->
|
||||||
|
mria:dirty_delete(?SESSION_STORE, ClientID).
|
||||||
|
|
||||||
|
lookup_session_store(ClientID) ->
|
||||||
|
case mnesia:dirty_read(?SESSION_STORE, ClientID) of
|
||||||
|
[] -> none;
|
||||||
|
[SS] -> {value, SS}
|
||||||
|
end.
|
||||||
|
|
||||||
|
put_session_message(SessMsg) ->
|
||||||
|
mria:dirty_write(?SESS_MSG_TAB, SessMsg).
|
||||||
|
|
||||||
|
put_message(Msg) ->
|
||||||
|
mria:dirty_write(?MSG_TAB, Msg).
|
||||||
|
|
||||||
|
get_message(MsgId) ->
|
||||||
|
case mnesia:read(?MSG_TAB, MsgId) of
|
||||||
|
[] -> error({msg_not_found, MsgId});
|
||||||
|
[Msg] -> Msg
|
||||||
|
end.
|
||||||
|
|
||||||
|
ro_transaction(Fun) ->
|
||||||
|
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
|
||||||
|
Res.
|
||||||
|
|
|
@ -160,6 +160,18 @@ fields("persistent_session_store") ->
|
||||||
sc(boolean(),
|
sc(boolean(),
|
||||||
#{ default => "false"
|
#{ default => "false"
|
||||||
})},
|
})},
|
||||||
|
{"db_backend",
|
||||||
|
sc(hoconsc:union([mnesia_ram, mnesia_disc]),
|
||||||
|
#{ default => "mnesia_ram"
|
||||||
|
, desc => """
|
||||||
|
Which database backend should be used for persistent session storage.<br>
|
||||||
|
mnesia_ram: Mnesia table, RAM only.<br>
|
||||||
|
mnesia_disc: Mnesia table, in RAM but also copy to disc.<br>
|
||||||
|
When running in a single node, it's recommended to have the session states copied to disc.<br>
|
||||||
|
When clustered, session states are replicated to all the nodes in the cluster,
|
||||||
|
most of the time, RAM only is good enough.
|
||||||
|
"""
|
||||||
|
})},
|
||||||
{"max_retain_undelivered",
|
{"max_retain_undelivered",
|
||||||
sc(duration(),
|
sc(duration(),
|
||||||
#{ default => "1h"
|
#{ default => "1h"
|
||||||
|
|
|
@ -58,6 +58,7 @@
|
||||||
|
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
, info/2
|
, info/2
|
||||||
|
, is_session/1
|
||||||
, stats/1
|
, stats/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -202,6 +203,9 @@ init(Opts) ->
|
||||||
%% Info, Stats
|
%% Info, Stats
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
is_session(#session{}) -> true;
|
||||||
|
is_session(_) -> false.
|
||||||
|
|
||||||
%% @doc Get infos of the session.
|
%% @doc Get infos of the session.
|
||||||
-spec(info(session()) -> emqx_types:infos()).
|
-spec(info(session()) -> emqx_types:infos()).
|
||||||
info(Session) ->
|
info(Session) ->
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-module(emqx_trie).
|
-module(emqx_trie).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_persistent_session.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([ mnesia/1
|
-export([ mnesia/1
|
||||||
|
@ -48,7 +49,8 @@
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(TRIE, emqx_trie).
|
-define(TRIE, emqx_trie).
|
||||||
-define(SESSION_TRIE, emqx_session_trie).
|
-define(SESSION_TRIE_RAM, emqx_session_trie_ram).
|
||||||
|
-define(SESSION_TRIE_DISC, emqx_session_trie_disc).
|
||||||
-define(PREFIX(Prefix), {Prefix, 0}).
|
-define(PREFIX(Prefix), {Prefix, 0}).
|
||||||
-define(TOPIC(Topic), {Topic, 1}).
|
-define(TOPIC(Topic), {Topic, 1}).
|
||||||
|
|
||||||
|
@ -57,6 +59,12 @@
|
||||||
, count = 0 :: non_neg_integer()
|
, count = 0 :: non_neg_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(ACTIVE_SESSION_TRIE, case emqx_config:get(?db_backend_key) of
|
||||||
|
mnesia_ram -> ?SESSION_TRIE_RAM;
|
||||||
|
mnesia_disc -> ?SESSION_TRIE_DISC
|
||||||
|
end).
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -79,7 +87,14 @@ create_session_trie() ->
|
||||||
StoreProps = [{ets, [{read_concurrency, true},
|
StoreProps = [{ets, [{read_concurrency, true},
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]}],
|
]}],
|
||||||
ok = mria:create_table(?SESSION_TRIE,
|
ok = mria:create_table(?SESSION_TRIE_RAM,
|
||||||
|
[{rlog_shard, ?ROUTE_SHARD},
|
||||||
|
{storage, ram_copies},
|
||||||
|
{record_name, ?TRIE},
|
||||||
|
{attributes, record_info(fields, ?TRIE)},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage_properties, StoreProps}]),
|
||||||
|
ok = mria:create_table(?SESSION_TRIE_DISC,
|
||||||
[{rlog_shard, ?ROUTE_SHARD},
|
[{rlog_shard, ?ROUTE_SHARD},
|
||||||
{storage, disc_copies},
|
{storage, disc_copies},
|
||||||
{record_name, ?TRIE},
|
{record_name, ?TRIE},
|
||||||
|
@ -87,6 +102,7 @@ create_session_trie() ->
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{storage_properties, StoreProps}]).
|
{storage_properties, StoreProps}]).
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics APIs
|
%% Topics APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -98,7 +114,7 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
|
|
||||||
-spec(insert_session(emqx_topic:topic()) -> ok).
|
-spec(insert_session(emqx_topic:topic()) -> ok).
|
||||||
insert_session(Topic) when is_binary(Topic) ->
|
insert_session(Topic) when is_binary(Topic) ->
|
||||||
insert(Topic, ?SESSION_TRIE).
|
insert(Topic, ?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
insert(Topic, Trie) when is_binary(Topic) ->
|
insert(Topic, Trie) when is_binary(Topic) ->
|
||||||
{TopicKey, PrefixKeys} = make_keys(Topic),
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
||||||
|
@ -115,7 +131,7 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
%% @doc Delete a topic filter from the trie.
|
%% @doc Delete a topic filter from the trie.
|
||||||
-spec(delete_session(emqx_topic:topic()) -> ok).
|
-spec(delete_session(emqx_topic:topic()) -> ok).
|
||||||
delete_session(Topic) when is_binary(Topic) ->
|
delete_session(Topic) when is_binary(Topic) ->
|
||||||
delete(Topic, ?SESSION_TRIE).
|
delete(Topic, ?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
delete(Topic, Trie) when is_binary(Topic) ->
|
delete(Topic, Trie) when is_binary(Topic) ->
|
||||||
{TopicKey, PrefixKeys} = make_keys(Topic),
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
||||||
|
@ -131,7 +147,7 @@ match(Topic) when is_binary(Topic) ->
|
||||||
|
|
||||||
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
||||||
match_session(Topic) when is_binary(Topic) ->
|
match_session(Topic) when is_binary(Topic) ->
|
||||||
match(Topic, ?SESSION_TRIE).
|
match(Topic, ?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
match(Topic, Trie) when is_binary(Topic) ->
|
match(Topic, Trie) when is_binary(Topic) ->
|
||||||
Words = emqx_topic:words(Topic),
|
Words = emqx_topic:words(Topic),
|
||||||
|
@ -154,7 +170,7 @@ match(Topic, Trie) when is_binary(Topic) ->
|
||||||
empty() -> empty(?TRIE).
|
empty() -> empty(?TRIE).
|
||||||
|
|
||||||
empty_session() ->
|
empty_session() ->
|
||||||
empty(?SESSION_TRIE).
|
empty(?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
|
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
|
||||||
|
|
||||||
|
@ -164,7 +180,7 @@ lock_tables() ->
|
||||||
|
|
||||||
-spec lock_session_tables() -> ok.
|
-spec lock_session_tables() -> ok.
|
||||||
lock_session_tables() ->
|
lock_session_tables() ->
|
||||||
mnesia:write_lock_table(?SESSION_TRIE).
|
mnesia:write_lock_table(?ACTIVE_SESSION_TRIE).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
|
|
@ -113,6 +113,8 @@ init_per_group(snabbkaffe, Config) ->
|
||||||
[ {kill_connection_process, true} | Config];
|
[ {kill_connection_process, true} | Config];
|
||||||
init_per_group(gc_tests, Config) ->
|
init_per_group(gc_tests, Config) ->
|
||||||
%% We need to make sure the system does not interfere with this test group.
|
%% We need to make sure the system does not interfere with this test group.
|
||||||
|
[maybe_kill_connection_process(ClientId, [{kill_connection_process, true}])
|
||||||
|
|| ClientId <- emqx_cm:all_client_ids()],
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
emqx_common_test_helpers:stop_apps([]),
|
||||||
SessionMsgEts = gc_tests_session_store,
|
SessionMsgEts = gc_tests_session_store,
|
||||||
MsgEts = gc_tests_msg_store,
|
MsgEts = gc_tests_msg_store,
|
||||||
|
@ -122,15 +124,15 @@ init_per_group(gc_tests, Config) ->
|
||||||
receive stop -> ok end
|
receive stop -> ok end
|
||||||
end),
|
end),
|
||||||
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
|
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
|
meck:expect(mnesia, dirty_first, fun(emqx_session_msg_ram) -> ets:first(SessionMsgEts);
|
||||||
(?MSG_TAB) -> ets:first(MsgEts);
|
(emqx_persistent_msg_ram) -> ets:first(MsgEts);
|
||||||
(X) -> meck:passthrough(X)
|
(X) -> meck:passthrough(X)
|
||||||
end),
|
end),
|
||||||
meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
|
meck:expect(mnesia, dirty_next, fun(emqx_session_msg_ram, X) -> ets:next(SessionMsgEts, X);
|
||||||
(?MSG_TAB, X) -> ets:next(MsgEts, X);
|
(emqx_persistent_msg_ram, X) -> ets:next(MsgEts, X);
|
||||||
(Tab, X) -> meck:passthrough([Tab, X])
|
(Tab, X) -> meck:passthrough([Tab, X])
|
||||||
end),
|
end),
|
||||||
meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X);
|
meck:expect(mnesia, dirty_delete, fun(emqx_persistent_msg_ram, X) -> ets:delete(MsgEts, X);
|
||||||
(Tab, X) -> meck:passthrough([Tab, X])
|
(Tab, X) -> meck:passthrough([Tab, X])
|
||||||
end),
|
end),
|
||||||
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
|
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
|
||||||
|
@ -230,50 +232,70 @@ receive_messages(Count, Msgs) ->
|
||||||
maybe_kill_connection_process(ClientId, Config) ->
|
maybe_kill_connection_process(ClientId, Config) ->
|
||||||
case ?config(kill_connection_process, Config) of
|
case ?config(kill_connection_process, Config) of
|
||||||
true ->
|
true ->
|
||||||
[ConnectionPid] = emqx_cm:lookup_channels(ClientId),
|
case emqx_cm:lookup_channels(ClientId) of
|
||||||
?assert(is_pid(ConnectionPid)),
|
[] ->
|
||||||
Ref = monitor(process, ConnectionPid),
|
ok;
|
||||||
ConnectionPid ! die_if_test,
|
[ConnectionPid] ->
|
||||||
receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
|
?assert(is_pid(ConnectionPid)),
|
||||||
after 3000 -> error(process_did_not_die)
|
Ref = monitor(process, ConnectionPid),
|
||||||
|
ConnectionPid ! die_if_test,
|
||||||
|
receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
|
||||||
|
after 3000 -> error(process_did_not_die)
|
||||||
|
end,
|
||||||
|
wait_for_cm_unregister(ClientId)
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
snabbkaffe_sync_publish(Topic, Payloads, Config) ->
|
wait_for_cm_unregister(ClientId) ->
|
||||||
|
wait_for_cm_unregister(ClientId, 10).
|
||||||
|
|
||||||
|
wait_for_cm_unregister(_ClientId, 0) ->
|
||||||
|
error(cm_did_not_unregister);
|
||||||
|
wait_for_cm_unregister(ClientId, N) ->
|
||||||
|
case emqx_cm:lookup_channels(ClientId) of
|
||||||
|
[] -> ok;
|
||||||
|
[_] -> timer:sleep(100), wait_for_cm_unregister(ClientId, N - 1)
|
||||||
|
end.
|
||||||
|
|
||||||
|
snabbkaffe_sync_publish(Topic, Payloads) ->
|
||||||
Fun = fun(Client, Payload) ->
|
Fun = fun(Client, Payload) ->
|
||||||
?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
|
?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
|
||||||
, #{?snk_kind := ps_persist_msg, payload := Payload}
|
, #{?snk_kind := ps_persist_msg, payload := Payload}
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
do_publish(Payloads, Fun, Config).
|
do_publish(Payloads, Fun, true).
|
||||||
|
|
||||||
publish(Topic, Payloads, Config) ->
|
publish(Topic, Payloads) ->
|
||||||
Fun = fun(Client, Payload) ->
|
Fun = fun(Client, Payload) ->
|
||||||
{ok, _} = emqtt:publish(Client, Topic, Payload, 2)
|
{ok, _} = emqtt:publish(Client, Topic, Payload, 2)
|
||||||
end,
|
end,
|
||||||
do_publish(Payloads, Fun, Config).
|
do_publish(Payloads, Fun, false).
|
||||||
|
|
||||||
do_publish(Payloads = [_|_], PublishFun, Config) ->
|
do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) ->
|
||||||
%% Publish from another process to avoid connection confusion.
|
%% Publish from another process to avoid connection confusion.
|
||||||
{Pid, Ref} =
|
{Pid, Ref} =
|
||||||
spawn_monitor(
|
spawn_monitor(
|
||||||
fun() ->
|
fun() ->
|
||||||
%% For convenience, always publish using tcp.
|
%% For convenience, always publish using tcp.
|
||||||
%% The publish path is not what we are testing.
|
%% The publish path is not what we are testing.
|
||||||
|
ClientID = <<"ps_SUITE_publisher">>,
|
||||||
{ok, Client} = emqtt:start_link([ {proto_ver, v5}
|
{ok, Client} = emqtt:start_link([ {proto_ver, v5}
|
||||||
|
, {clientid, ClientID}
|
||||||
, {port, 1883} ]),
|
, {port, 1883} ]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
|
lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
|
||||||
ok = emqtt:disconnect(Client)
|
ok = emqtt:disconnect(Client),
|
||||||
|
%% Snabbkaffe sometimes fails unless all processes are gone.
|
||||||
|
[wait_for_cm_unregister(ClientID) || WaitForUnregister]
|
||||||
end),
|
end),
|
||||||
receive
|
receive
|
||||||
{'DOWN', Ref, process, Pid, normal} -> ok;
|
{'DOWN', Ref, process, Pid, normal} -> ok;
|
||||||
{'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
|
{'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
|
||||||
end;
|
end;
|
||||||
do_publish(Payload, PublishFun, Config) ->
|
do_publish(Payload, PublishFun, WaitForUnregister) ->
|
||||||
do_publish([Payload], PublishFun, Config).
|
do_publish([Payload], PublishFun, WaitForUnregister).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test Cases
|
%% Test Cases
|
||||||
|
@ -297,7 +319,7 @@ t_connect_session_expiry_interval(Config) ->
|
||||||
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
|
|
||||||
publish(Topic, Payload, Config),
|
publish(Topic, Payload),
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
||||||
{proto_ver, v5},
|
{proto_ver, v5},
|
||||||
|
@ -424,7 +446,7 @@ t_process_dies_session_expires(Config) ->
|
||||||
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
|
|
||||||
ok = publish(Topic, [Payload], Config),
|
ok = publish(Topic, [Payload]),
|
||||||
|
|
||||||
SessionId =
|
SessionId =
|
||||||
case ?config(persistent_store_enabled, Config) of
|
case ?config(persistent_store_enabled, Config) of
|
||||||
|
@ -498,7 +520,7 @@ t_publish_while_client_is_gone(Config) ->
|
||||||
ok = emqtt:disconnect(Client1),
|
ok = emqtt:disconnect(Client1),
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
|
|
||||||
ok = publish(Topic, [Payload1, Payload2], Config),
|
ok = publish(Topic, [Payload1, Payload2]),
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
||||||
{clientid, ClientId},
|
{clientid, ClientId},
|
||||||
|
@ -544,7 +566,7 @@ t_clean_start_drops_subscriptions(Config) ->
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
|
|
||||||
%% 2.
|
%% 2.
|
||||||
ok = publish(Topic, Payload1, Config),
|
ok = publish(Topic, Payload1),
|
||||||
|
|
||||||
%% 3.
|
%% 3.
|
||||||
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
@ -556,7 +578,7 @@ t_clean_start_drops_subscriptions(Config) ->
|
||||||
?assertEqual(0, client_info(session_present, Client2)),
|
?assertEqual(0, client_info(session_present, Client2)),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
|
||||||
|
|
||||||
ok = publish(Topic, Payload2, Config),
|
ok = publish(Topic, Payload2),
|
||||||
[Msg1] = receive_messages(1),
|
[Msg1] = receive_messages(1),
|
||||||
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
|
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
|
||||||
|
|
||||||
|
@ -571,7 +593,7 @@ t_clean_start_drops_subscriptions(Config) ->
|
||||||
| Config]),
|
| Config]),
|
||||||
{ok, _} = emqtt:ConnFun(Client3),
|
{ok, _} = emqtt:ConnFun(Client3),
|
||||||
|
|
||||||
ok = publish(Topic, Payload3, Config),
|
ok = publish(Topic, Payload3),
|
||||||
[Msg2] = receive_messages(1),
|
[Msg2] = receive_messages(1),
|
||||||
?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
|
?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
|
||||||
|
|
||||||
|
@ -625,7 +647,7 @@ t_multiple_subscription_matches(Config) ->
|
||||||
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
|
|
||||||
publish(Topic, Payload, Config),
|
publish(Topic, Payload),
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
||||||
{proto_ver, v5},
|
{proto_ver, v5},
|
||||||
|
@ -675,9 +697,9 @@ t_lost_messages_because_of_gc(Config) ->
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
||||||
emqtt:disconnect(Client1),
|
emqtt:disconnect(Client1),
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
publish(Topic, Payload1, Config),
|
publish(Topic, Payload1),
|
||||||
timer:sleep(2 * Retain),
|
timer:sleep(2 * Retain),
|
||||||
publish(Topic, Payload2, Config),
|
publish(Topic, Payload2),
|
||||||
emqx_persistent_session_gc:message_gc_worker(),
|
emqx_persistent_session_gc:message_gc_worker(),
|
||||||
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
|
@ -790,7 +812,7 @@ t_snabbkaffe_pending_messages(Config) ->
|
||||||
|
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
snabbkaffe_sync_publish(Topic, Payloads, Config),
|
snabbkaffe_sync_publish(Topic, Payloads),
|
||||||
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
Msgs = receive_messages(length(Payloads)),
|
Msgs = receive_messages(length(Payloads)),
|
||||||
|
@ -829,7 +851,7 @@ t_snabbkaffe_buffered_messages(Config) ->
|
||||||
ok = emqtt:disconnect(Client1),
|
ok = emqtt:disconnect(Client1),
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
maybe_kill_connection_process(ClientId, Config),
|
||||||
|
|
||||||
publish(Topic, Payloads1, Config),
|
publish(Topic, Payloads1),
|
||||||
|
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
|
@ -838,7 +860,7 @@ t_snabbkaffe_buffered_messages(Config) ->
|
||||||
#{ ?snk_kind := ps_resume_end }),
|
#{ ?snk_kind := ps_resume_end }),
|
||||||
spawn_link(fun() ->
|
spawn_link(fun() ->
|
||||||
?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000),
|
?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000),
|
||||||
publish(Topic, Payloads2, Config)
|
publish(Topic, Payloads2)
|
||||||
end),
|
end),
|
||||||
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
|
Loading…
Reference in New Issue