feat: persistent sessions stored in mnesia

* Persistent sessions can survive node/connection process down
* Internal SessionID is generated, external ClientID is as before
* Sessions are persisted to mnesia
* A session router is added in parallel to the ordinary router
* Messages that are subscribed to by a persistent session are
  persisted by publisher
* Information about persisted and delivered messages are stored in
  mnesia per session.
* A resume protocol similar to takeover is implemented for resuming
  from mnesia
* Can be configured (and enabled) by the top lever config
  "persistent_session_store"
This commit is contained in:
Tobias Lindahl 2021-10-07 09:23:20 +02:00
parent fc7b4c0009
commit 7b394267dd
22 changed files with 2655 additions and 77 deletions

View File

@ -1638,3 +1638,33 @@ example_common_websocket_options {
client_max_window_bits = 15
}
}
persistent_session_store {
## Enable/disable internal persistent session store.
##
## @doc persistent_session_store.enabled
## ValueType: Boolean
## Default: false
enabled = false
## How long are undelivered messages retained in the store
##
## @doc persistent_session_store.max_retain_undelivered
## ValueType: Duration
## Default: 1h
max_retain_undelivered = 1h
## The time interval in which to try to run garbage collection of persistent session messages
##
## @doc persistent_session_store.message_gc_interval
## ValueType: Duration
## Default: 1h
message_gc_interval = 1h
## The time interval in which to try to run garbage collection of persistent session transient data
##
## @doc persistent_session_store.session_message_gc_interval
## ValueType: Duration
## Default: 1m
session_message_gc_interval = 1m
}

View File

@ -23,10 +23,12 @@
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
-define(CM_SHARD, emqx_cm_shard).
-define(ROUTE_SHARD, route_shard).
-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
-define(BOOT_SHARDS, [ ?ROUTE_SHARD
, ?COMMON_SHARD
, ?SHARED_SUB_SHARD
, ?PERSISTENT_SESSION_SHARD
]).
%% Banner
@ -87,7 +89,7 @@
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()}
dest :: node() | {binary(), node()} | binary()
}).
%%--------------------------------------------------------------------

View File

@ -206,6 +206,7 @@ publish(Msg) when is_record(Msg, message) ->
payload => emqx_message:to_log_map(Msg)}),
[];
Msg1 = #message{topic = Topic} ->
emqx_persistent_session:persist_message(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end.

View File

@ -178,8 +178,10 @@ info(timers, #channel{timers = Timers}) -> Timers.
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
set_session(Session, Channel) ->
Channel#channel{session = Session}.
set_session(Session, Channel = Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
%% Assume that this is also an updated session. Allow side effect.
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Channel#channel{session = Session1}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
@ -748,8 +750,23 @@ process_disconnect(ReasonCode, Properties, Channel) ->
{ok, {close, disconnect_reason(ReasonCode)}, NChannel}.
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
Channel = #channel{conninfo = ConnInfo}) ->
Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}};
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
EI = timer:seconds(Interval),
OldEI = maps:get(expiry_interval, ConnInfo, 0),
case OldEI =:= EI of
true -> Channel;
false ->
NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}},
ClientID = maps:get(clientid, ClientInfo, undefined),
%% Check if the client turns off persistence (turning it on is disallowed)
case EI =:= 0 andalso OldEI > 0 of
true ->
S = emqx_persistent_session:discard(ClientID, NChannel#channel.session),
set_session(S, NChannel);
false ->
NChannel
end
end;
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
%%--------------------------------------------------------------------
@ -762,41 +779,32 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
Delivers1 = maybe_nack(Delivers),
Delivers2 = ignore_local(Delivers1, ClientId, Session),
Delivers2 = emqx_session:ignore_local(Delivers1, ClientId, Session),
NSession = emqx_session:enqueue(Delivers2, Session),
NChannel = set_session(NSession, Channel),
%% We consider queued/dropped messages as delivered since they are now in the session state.
maybe_mark_as_delivered(Session, Delivers),
{ok, NChannel};
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{session = Session,
clientinfo = #{clientid := ClientId}}) ->
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
clientinfo = #{clientid := ClientId}
}) ->
case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of
{ok, Publishes, NSession} ->
NChannel = set_session(NSession, Channel),
maybe_mark_as_delivered(NSession, Delivers),
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
{ok, NSession} ->
{ok, set_session(NSession, Channel)}
end.
ignore_local(Delivers, Subscriber, Session) ->
Subs = emqx_session:info(subscriptions, Session),
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
_ ->
false
end
end, Delivers).
%% Nack delivers from shared subscription
maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
@ -805,6 +813,14 @@ not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg)
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
maybe_mark_as_delivered(Session, Delivers) ->
case emqx_session:info(is_persistent, Session) of
false -> skip;
true ->
SessionID = emqx_session:info(id, Session),
emqx_persistent_session:mark_as_delivered(SessionID, Delivers)
end.
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
@ -1027,10 +1043,28 @@ handle_info(clean_authz_cache, Channel) ->
ok = emqx_authz_cache:empty_authz_cache(),
{ok, Channel};
handle_info(die_if_test = Info, Channel) ->
die_if_test_compiled(),
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, Channel};
handle_info(Info, Channel) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{ok, Channel}.
-ifdef(TEST).
-spec die_if_test_compiled() -> no_return().
die_if_test_compiled() ->
exit(normal).
-else.
die_if_test_compiled() ->
ok.
-endif.
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
@ -1144,11 +1178,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
terminate(_, #channel{conn_state = idle}) -> ok;
terminate(normal, Channel) ->
run_terminate_hook(normal, Channel);
terminate({shutdown, Reason}, Channel)
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
terminate({shutdown, kicked}, Channel) ->
_ = emqx_persistent_session:persist(Channel#channel.clientinfo,
Channel#channel.conninfo,
Channel#channel.session),
run_terminate_hook(kicked, Channel);
terminate({shutdown, Reason}, Channel) when Reason =:= discarded;
Reason =:= takeovered ->
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
_ = emqx_persistent_session:persist(Channel#channel.clientinfo,
Channel#channel.conninfo,
Channel#channel.session),
run_terminate_hook(Reason, Channel).
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
@ -1612,8 +1654,11 @@ maybe_resume_session(#channel{resuming = false}) ->
ignore;
maybe_resume_session(#channel{session = Session,
resuming = true,
pendings = Pendings}) ->
pendings = Pendings,
clientinfo = #{clientid := ClientId}}) ->
{ok, Publishes, Session1} = emqx_session:replay(Session),
%% We consider queued/dropped messages as delivered since they are now in the session state.
emqx_persistent_session:mark_as_delivered(ClientId, Pendings),
case emqx_session:deliver(Pendings, Session1) of
{ok, Session2} ->
{ok, Publishes, Session2};

View File

@ -19,7 +19,6 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -214,9 +213,11 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
CleanStart = fun(_) ->
ok = discard_session(ClientId),
ok = emqx_persistent_session:discard_if_present(ClientId),
Session = create_session(ClientInfo, ConnInfo),
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
{ok, #{session => Session1, present => false}}
end,
emqx_cm_locker:trans(ClientId, CleanStart);
@ -224,17 +225,28 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
ResumeStart = fun(_) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
{persistent, Session} ->
%% This is a persistent session without a managing process.
{Session1, Pendings} =
emqx_persistent_session:resume(ClientInfo, ConnInfo, Session),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1,
present => true,
pendings => Pendings}};
{living, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
{ok, #{session => Session1,
present => true,
pendings => Pendings}};
{error, not_found} ->
Session = create_session(ClientInfo, ConnInfo),
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
{ok, #{session => Session1, present => false}}
end
end,
emqx_cm_locker:trans(ClientId, ResumeStart).
@ -246,13 +258,17 @@ create_session(ClientInfo, ConnInfo) ->
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
Session.
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_interval := EI}) ->
#{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
max_inflight => MaxInflight,
retry_interval => get_mqtt_conf(Zone, retry_interval),
await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
mqueue => mqueue_confs(Zone)
mqueue => mqueue_confs(Zone),
%% TODO: Add conf for allowing/disallowing persistent sessions.
%% Note that the connection info is already enriched to have
%% default config values for session expiry.
is_persistent => EI > 0
}.
mqueue_confs(Zone) ->
@ -266,11 +282,17 @@ get_mqtt_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
%% @doc Try to takeover a session.
-spec(takeover_session(emqx_types:clientid()) ->
{error, term()} | {ok, atom(), pid(), emqx_session:session()}).
-spec(takeover_session(emqx_types:clientid())
-> {error, term()}
| {living, atom(), pid(), emqx_session:session()}
| {persistent, emqx_session:session()}).
takeover_session(ClientId) ->
case lookup_channels(ClientId) of
[] -> {error, not_found};
[] ->
case emqx_persistent_session:lookup(ClientId) of
[] -> {error, not_found};
[Session] -> {persistent, Session}
end;
[ChanPid] ->
takeover_session(ClientId, ChanPid);
ChanPids ->
@ -285,10 +307,13 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined ->
{error, not_found};
case emqx_persistent_session:lookup(ClientId) of
[] -> {error, not_found};
[Session] -> {persistent, Session}
end;
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
{living, ConnMod, ChanPid, Session}
end;
takeover_session(ClientId, ChanPid) ->

View File

@ -39,6 +39,9 @@
, from_base62/1
]).
-export_type([ guid/0
]).
-define(TAG_VERSION, 131).
-define(PID_EXT, 103).
-define(NEW_PID_EXT, 88).

View File

@ -0,0 +1,492 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session).
-export([ is_store_enabled/0
, init_db_backend/0
]).
-export([ discard/2
, discard_if_present/1
, lookup/1
, persist/3
, persist_message/1
, pending/1
, pending/2
, resume/3
]).
-export([ add_subscription/3
, remove_subscription/3
]).
-export([ mark_as_delivered/2
, mark_resume_begin/1
]).
-export([ pending_messages_in_db/2
, delete_session_message/1
, gc_session_messages/1
, session_message_info/2
]).
-export([ delete_message/1
, first_message_id/0
, next_message_id/1
]).
-export_type([ sess_msg_key/0
]).
-include("emqx.hrl").
-include("emqx_persistent_session.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-compile({inline, [is_store_enabled/0]}).
-define(MAX_EXPIRY_INTERVAL, 4294967295000). %% 16#FFFFFFFF * 1000
%% NOTE: Order is significant because of traversal order of the table.
-define(MARKER, 3).
-define(DELIVERED, 2).
-define(UNDELIVERED, 1).
-define(ABANDONED, 0).
-type bin_timestamp() :: <<_:64>>.
-opaque sess_msg_key() ::
{emqx_guid:guid(), emqx_guid:guid(), emqx_types:topic(), ?UNDELIVERED | ?DELIVERED}
| {emqx_guid:guid(), emqx_guid:guid(), <<>> , ?MARKER}
| {emqx_guid:guid(), <<>> , bin_timestamp() , ?ABANDONED}.
-type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok').
%%--------------------------------------------------------------------
%% Init
%%--------------------------------------------------------------------
init_db_backend() ->
case persistent_term:get({?MODULE, backend_init_done}, undefined) of
undefined ->
Backend = case is_store_enabled() of
true -> emqx_persistent_session_mnesia_backend;
false -> emqx_persistent_session_dummy_backend
end,
persistent_term:put(?db_backend_key, Backend),
persistent_term:put(backend_init_done, true);
true ->
ok
end.
is_store_enabled() ->
emqx_config:get(?is_enabled_key).
%%--------------------------------------------------------------------
%% Session message ADT API
%%--------------------------------------------------------------------
-spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term().
session_message_info(timestamp, {_, <<>>, <<TS:64>>, ?ABANDONED}) -> TS;
session_message_info(timestamp, {_, GUID, _ , _ }) -> emqx_guid:timestamp(GUID);
session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID.
%%--------------------------------------------------------------------
%% DB API
%%--------------------------------------------------------------------
first_message_id() ->
?db_backend:first_message_id().
next_message_id(Key) ->
?db_backend:next_message_id(Key).
delete_message(Key) ->
?db_backend:delete_message(Key).
first_session_message() ->
?db_backend:first_session_message().
next_session_message(Key) ->
?db_backend:next_session_message(Key).
delete_session_message(Key) ->
?db_backend:delete_session_message(Key).
put_session_store(#session_store{} = SS) ->
?db_backend:put_session_store(SS).
delete_session_store(ClientID) ->
?db_backend:delete_session_store(ClientID).
lookup_session_store(ClientID) ->
?db_backend:lookup_session_store(ClientID).
put_session_message({_, _, _, _} = Key) ->
?db_backend:put_session_message(#session_msg{ key = Key }).
put_message(Msg) ->
?db_backend:put_message(Msg).
get_message(MsgId) ->
?db_backend:get_message(MsgId).
pending_messages_in_db(SessionID, MarkerIds) ->
?db_backend:ro_transaction(pending_messages_fun(SessionID, MarkerIds)).
%%--------------------------------------------------------------------
%% Session API
%%--------------------------------------------------------------------
%% The timestamp (TS) is the last time a client interacted with the session,
%% or when the client disconnected.
-spec persist(emqx_types:clientinfo(),
emqx_types:conninfo(),
emqx_session:session()) -> emqx_session:session().
persist(#{ clientid := ClientID }, ConnInfo, Session) ->
case ClientID == undefined orelse not emqx_session:info(is_persistent, Session) of
true -> Session;
false ->
SS = #session_store{ client_id = ClientID
, expiry_interval = maps:get(expiry_interval, ConnInfo)
, ts = timestamp_from_conninfo(ConnInfo)
, session = Session},
case persistent_session_status(SS) of
not_persistent -> Session;
expired -> discard(ClientID, Session);
persistent -> put_session_store(SS),
Session
end
end.
timestamp_from_conninfo(ConnInfo) ->
case maps:get(disconnected_at, ConnInfo, undefined) of
undefined -> erlang:system_time(millisecond);
Disconnect -> Disconnect
end.
lookup(ClientID) when is_binary(ClientID) ->
case lookup_session_store(ClientID) of
none -> [];
{value, #session_store{session = S} = SS} ->
case persistent_session_status(SS) of
not_persistent -> []; %% For completeness. Should not happen
expired -> [];
persistent -> [S]
end
end.
-spec discard_if_present(binary()) -> 'ok'.
discard_if_present(ClientID) ->
case lookup(ClientID) of
[] -> ok;
[Session] ->
_ = discard(ClientID, Session),
ok
end.
-spec discard(binary(), emgx_session:session()) -> emgx_session:session().
discard(ClientID, Session) ->
discard_opt(is_store_enabled(), ClientID, Session).
discard_opt(false,_ClientID, Session) ->
emqx_session:set_field(is_persistent, false, Session);
discard_opt(true, ClientID, Session) ->
delete_session_store(ClientID),
SessionID = emqx_session:info(id, Session),
put_session_message({SessionID, <<>>, << (erlang:system_time(microsecond)) : 64>>, ?ABANDONED}),
Subscriptions = emqx_session:info(subscriptions, Session),
emqx_session_router:delete_routes(SessionID, Subscriptions),
emqx_session:set_field(is_persistent, false, Session).
-spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid().
mark_resume_begin(SessionID) ->
MarkerID = emqx_guid:gen(),
put_session_message({SessionID, MarkerID, <<>>, ?MARKER}),
MarkerID.
add_subscription(TopicFilter, SessionID, true = _IsPersistent) ->
emqx_session_router:do_add_route(TopicFilter, SessionID);
add_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
ok.
remove_subscription(TopicFilter, SessionID, true = _IsPersistent) ->
emqx_session_router:do_delete_route(TopicFilter, SessionID);
remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
ok.
%%--------------------------------------------------------------------
%% Resuming from DB state
%%--------------------------------------------------------------------
%% Must be called inside a emqx_cm_locker transaction.
-spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session()
) -> {emqx_session:session(), [emqx_types:deliver()]}.
resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
SessionID = emqx_session:info(id, Session),
?tp(ps_resuming, #{from => db, sid => SessionID}),
%% NOTE: Order is important!
%% 1. Get pending messages from DB.
?tp(ps_initial_pendings, #{sid => SessionID}),
Pendings1 = pending(SessionID),
Pendings2 = emqx_session:ignore_local(Pendings1, ClientID, Session),
?tp(ps_got_initial_pendings, #{ sid => SessionID
, msgs => Pendings1}),
%% 2. Enqueue messages to mimic that the process was alive
%% when the messages were delivered.
?tp(ps_persist_pendings, #{sid => SessionID}),
Session1 = emqx_session:enqueue(Pendings2, Session),
Session2 = persist(ClientInfo, ConnInfo, Session1),
mark_as_delivered(SessionID, Pendings2),
?tp(ps_persist_pendings_msgs, #{ msgs => Pendings2
, sid => SessionID}),
%% 3. Notify writers that we are resuming.
%% They will buffer new messages.
?tp(ps_notify_writers, #{sid => SessionID}),
Nodes = mria_mnesia:running_nodes(),
NodeMarkers = resume_begin(Nodes, SessionID),
?tp(ps_node_markers, #{sid => SessionID, markers => NodeMarkers}),
%% 4. Subscribe to topics.
?tp(ps_resume_session, #{sid => SessionID}),
ok = emqx_session:resume(ClientInfo, Session2),
%% 5. Get pending messages from DB until we find all markers.
?tp(ps_marker_pendings, #{sid => SessionID}),
MarkerIDs = [Marker || {_, Marker} <- NodeMarkers],
Pendings3 = pending(SessionID, MarkerIDs),
Pendings4 = emqx_session:ignore_local(Pendings3, ClientID, Session),
?tp(ps_marker_pendings_msgs, #{ sid => SessionID
, msgs => Pendings4}),
%% 6. Get pending messages from writers.
?tp(ps_resume_end, #{sid => SessionID}),
WriterPendings = resume_end(Nodes, SessionID),
?tp(ps_writer_pendings, #{ msgs => WriterPendings
, sid => SessionID}),
%% 7. Drain the inbox and usort the messages
%% with the pending messages. (Should be done by caller.)
{Session2, Pendings4 ++ WriterPendings}.
resume_begin(Nodes, SessionID) ->
Res = erpc:multicall(Nodes, emqx_session_router, resume_begin, [self(), SessionID]),
[{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)].
resume_end(Nodes, SessionID) ->
Res = erpc:multicall(Nodes, emqx_session_router, resume_end, [self(), SessionID]),
?tp(ps_erpc_multical_result, #{ res => Res, sid => SessionID }),
%% TODO: Should handle the errors
[ {deliver, STopic, M}
|| {ok, {ok, Messages}} <- Res,
{{M, STopic}} <- Messages
].
%%--------------------------------------------------------------------
%% Messages API
%%--------------------------------------------------------------------
persist_message(Msg) ->
case is_store_enabled() of
true -> do_persist_message(Msg);
false -> ok
end.
do_persist_message(Msg) ->
case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of
true -> ok;
false ->
case emqx_session_router:match_routes(emqx_message:topic(Msg)) of
[] -> ok;
Routes ->
put_message(Msg),
MsgId = emqx_message:id(Msg),
persist_message_routes(Routes, MsgId, Msg)
end
end.
persist_message_routes([#route{dest = SessionID, topic = STopic}|Left], MsgId, Msg) ->
?tp(ps_persist_msg, #{sid => SessionID, payload => emqx_message:payload(Msg)}),
put_session_message({SessionID, MsgId, STopic, ?UNDELIVERED}),
emqx_session_router:buffer(SessionID, STopic, Msg),
persist_message_routes(Left, MsgId, Msg);
persist_message_routes([], _MsgId, _Msg) ->
ok.
mark_as_delivered(SessionID, List) ->
case is_store_enabled() of
true -> do_mark_as_delivered(SessionID, List);
false -> ok
end.
do_mark_as_delivered(SessionID, [{deliver, STopic, Msg}|Left]) ->
MsgID = emqx_message:id(Msg),
put_session_message({SessionID, MsgID, STopic, ?DELIVERED}),
do_mark_as_delivered(SessionID, Left);
do_mark_as_delivered(_SessionID, []) ->
ok.
-spec pending(emqx_session:sessionID()) ->
[{emqx_types:message(), STopic :: binary()}].
pending(SessionID) ->
pending(SessionID, []).
-spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) ->
[{emqx_types:message(), STopic :: binary()}].
pending(SessionID, MarkerIds) ->
%% TODO: Handle lost MarkerIDs
case emqx_session_router:pending(SessionID, MarkerIds) of
incomplete ->
timer:sleep(10),
pending(SessionID, MarkerIds);
Delivers ->
Delivers
end.
%%--------------------------------------------------------------------
%% Session internal functions
%%--------------------------------------------------------------------
%% @private [MQTT-3.1.2-23]
persistent_session_status(#session_store{expiry_interval = 0}) ->
not_persistent;
persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) ->
persistent;
persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
case E + TS > erlang:system_time(millisecond) of
true -> persistent;
false -> expired
end.
%%--------------------------------------------------------------------
%% Pending messages internal functions
%%--------------------------------------------------------------------
pending_messages_fun(SessionID, MarkerIds) ->
fun() ->
case pending_messages({SessionID, <<>>, <<>>, ?DELIVERED}, [], MarkerIds) of
{Pending, []} -> read_pending_msgs(Pending, []);
{_Pending, [_|_]} -> incomplete
end
end.
read_pending_msgs([{MsgId, STopic}|Left], Acc) ->
Acc1 = try [{deliver, STopic, get_message(MsgId)}|Acc]
catch error:{msg_not_found, _} ->
HighwaterMark = erlang:system_time(microsecond)
- emqx_config:get(?msg_retain) * 1000,
case emqx_guid:timestamp(MsgId) < HighwaterMark of
true -> Acc; %% Probably cleaned by GC
false -> error({msg_not_found, MsgId})
end
end,
read_pending_msgs(Left, Acc1);
read_pending_msgs([], Acc) ->
lists:reverse(Acc).
%% The keys are ordered by
%% {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
%% {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
%% where
%% <<>> < emqx_guid:guid()
%% <<>> < bin_timestamp()
%% emqx_guid:guid() is ordered in ts() and by node()
%% ?ABANDONED < ?UNDELIVERED < ?DELIVERED < ?MARKER
%%
%% We traverse the table until we reach another session.
%% TODO: Garbage collect the delivered messages.
pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, MarkerIds) ->
case next_session_message(PrevKey) of
{S, <<>>, _TS, ?ABANDONED} when S =:= SessionID ->
{[], []};
{S, MsgId, <<>>, ?MARKER} = Key when S =:= SessionID ->
MarkerIds1 = MarkerIds -- [MsgId],
case PrevTag =:= ?UNDELIVERED of
false -> pending_messages(Key, Acc, MarkerIds1);
true -> pending_messages(Key, [{PrevMsgId, PrevSTopic}|Acc], MarkerIds1)
end;
{S, MsgId, STopic, ?DELIVERED} = Key when S =:= SessionID,
MsgId =:= PrevMsgId,
STopic =:= PrevSTopic ->
pending_messages(Key, Acc, MarkerIds);
{S, _MsgId, _STopic, _Tag} = Key when S =:= SessionID ->
case PrevTag =:= ?UNDELIVERED of
false -> pending_messages(Key, Acc, MarkerIds);
true -> pending_messages(Key, [{PrevMsgId, PrevSTopic}|Acc], MarkerIds)
end;
_What -> %% Next sessionID or '$end_of_table'
case PrevTag =:= ?UNDELIVERED of
false -> {lists:reverse(Acc), MarkerIds};
true -> {lists:reverse([{PrevMsgId, PrevSTopic}|Acc]), MarkerIds}
end
end.
%%--------------------------------------------------------------------
%% Garbage collection
%%--------------------------------------------------------------------
-spec gc_session_messages(gc_traverse_fun()) -> 'ok'.
gc_session_messages(Fun) ->
gc_traverse(first_session_message(), <<>>, false, Fun).
gc_traverse('$end_of_table', _SessionID, _Abandoned, _Fun) ->
ok;
gc_traverse({S, <<>>, _TS, ?ABANDONED} = Key, _SessionID, _Abandoned, Fun) ->
%% Only report the abandoned session if it has no messages.
%% We want to keep the abandoned marker to last to make the GC reentrant.
case next_session_message(Key) of
'$end_of_table' = NextKey ->
ok = Fun(abandoned, Key),
gc_traverse(NextKey, S, true, Fun);
{S2, _, _, _} = NextKey when S =:= S2 ->
gc_traverse(NextKey, S, true, Fun);
{_, _, _, _} = NextKey ->
ok = Fun(abandoned, Key),
gc_traverse(NextKey, S, true, Fun)
end;
gc_traverse({S, _MsgID, <<>>, ?MARKER} = Key, SessionID, Abandoned, Fun) ->
ok = Fun(marker, Key),
NewAbandoned = S =:= SessionID andalso Abandoned,
gc_traverse(next_session_message(Key), S, NewAbandoned, Fun);
gc_traverse({S, _MsgID, _STopic, _Tag} = Key, SessionID, Abandoned, Fun) when Abandoned andalso
S =:= SessionID ->
%% Delete all messages from an abandoned session.
ok = Fun(delete, Key),
gc_traverse(next_session_message(Key), S, Abandoned, Fun);
gc_traverse({S, MsgID, STopic, ?UNDELIVERED} = Key, SessionID, Abandoned, Fun) ->
case next_session_message(Key) of
{S1, M, ST, ?DELIVERED} = NextKey when S1 =:= S andalso
MsgID =:= M andalso
STopic =:= ST ->
%% We have both markers for the same message/topic so it is safe to delete both.
ok = Fun(delete, Key),
ok = Fun(delete, NextKey),
gc_traverse(next_session_message(NextKey), S, Abandoned, Fun);
NextKey ->
%% Something else is here, so let's just loop.
NewAbandoned = S =:= SessionID andalso Abandoned,
gc_traverse(NextKey, SessionID, NewAbandoned, Fun)
end;
gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) ->
%% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing.
NewAbandoned = S =:= SessionID andalso Abandoned,
gc_traverse(next_session_message(Key), S, NewAbandoned, Fun).

View File

@ -0,0 +1,33 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(SESSION_STORE, emqx_session_store).
-define(SESS_MSG_TAB, emqx_session_msg).
-define(MSG_TAB, emqx_persistent_msg).
-record(session_store, { client_id :: binary()
, expiry_interval :: non_neg_integer()
, ts :: non_neg_integer()
, session :: emqx_session:session()}).
-record(session_msg, {key :: emqx_persistent_session:sess_msg_key(),
val = [] :: []}).
-define(db_backend_key, [persistent_session_store, db_backend]).
-define(is_enabled_key, [persistent_session_store, enabled]).
-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
-define(db_backend, (persistent_term:get(?db_backend_key))).

View File

@ -0,0 +1,76 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_dummy_backend).
-include("emqx_persistent_session.hrl").
-export([ first_message_id/0
, next_message_id/1
, delete_message/1
, first_session_message/0
, next_session_message/1
, delete_session_message/1
, put_session_store/1
, delete_session_store/1
, lookup_session_store/1
, put_session_message/1
, put_message/1
, get_message/1
, ro_transaction/1
]).
first_message_id() ->
'$end_of_table'.
next_message_id(_) ->
'$end_of_table'.
-spec delete_message(binary()) -> no_return().
delete_message(_Key) ->
error(should_not_be_called).
first_session_message() ->
'$end_of_table'.
next_session_message(_Key) ->
'$end_of_table'.
delete_session_message(_Key) ->
ok.
put_session_store(#session_store{}) ->
ok.
delete_session_store(_ClientID) ->
ok.
lookup_session_store(_ClientID) ->
none.
put_session_message({_, _, _, _}) ->
ok.
put_message(_Msg) ->
ok.
-spec get_message(binary()) -> no_return().
get_message(_MsgId) ->
error(should_not_be_called).
ro_transaction(Fun) ->
Fun().

View File

@ -0,0 +1,153 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_gc).
-behaviour(gen_server).
-include("emqx_persistent_session.hrl").
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
]).
-ifdef(TEST).
-export([ session_gc_worker/2
, message_gc_worker/0
]).
-endif.
-define(SERVER, ?MODULE).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
{ok, start_session_gc_timer(#{})}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({timeout, Ref, session_gc_timeout}, State) ->
State1 = session_gc_timeout(Ref, State),
{noreply, State1};
handle_info({timeout, Ref, message_gc_timeout}, State) ->
State1 = message_gc_timeout(Ref, State),
{noreply, State1};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Session messages GC
%%--------------------------------------------------------------------
start_session_gc_timer(State) ->
Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]),
State#{ session_gc_timer => erlang:start_timer(Interval, self(), session_gc_timeout)}.
session_gc_timeout(Ref, #{ session_gc_timer := R } = State) when R =:= Ref ->
%% Prevent overlapping processes.
GCPid = maps:get(session_gc_pid, State, undefined),
case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of
true -> start_session_gc_timer(State);
false -> start_session_gc_timer(State#{ session_gc_pid => proc_lib:spawn_link(fun session_gc_worker/0)})
end;
session_gc_timeout(_Ref, State) ->
State.
session_gc_worker() ->
ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2).
%% TODO: Maybe these should be configurable?
-define(MARKER_GRACE_PERIOD, 60000000).
-define(ABANDONED_GRACE_PERIOD, 300000000).
session_gc_worker(delete, Key) ->
emqx_persistent_session:delete_session_message(Key);
session_gc_worker(marker, Key) ->
TS = emqx_persistent_session:session_message_info(timestamp, Key),
case TS + ?MARKER_GRACE_PERIOD < erlang:system_time(microsecond) of
true -> emqx_persistent_session:delete_session_message(Key);
false -> ok
end;
session_gc_worker(abandoned, Key) ->
TS = emqx_persistent_session:session_message_info(timestamp, Key),
case TS + ?ABANDONED_GRACE_PERIOD < erlang:system_time(microsecond) of
true -> emqx_persistent_session:delete_session_message(Key);
false -> ok
end.
%%--------------------------------------------------------------------
%% Message GC
%% --------------------------------------------------------------------
%% The message GC simply removes all messages older than the retain
%% period. A more exact GC would either involve treating the session
%% message table as root set, or some kind of reference counting.
%% We sacrifice space for simplicity at this point.
start_message_gc_timer(State) ->
Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]),
State#{ message_gc_timer => erlang:start_timer(Interval, self(), message_gc_timeout)}.
message_gc_timeout(Ref, #{ message_gc_timer := R } = State) when R =:= Ref ->
%% Prevent overlapping processes.
GCPid = maps:get(message_gc_pid, State, undefined),
case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of
true -> start_message_gc_timer(State);
false -> start_message_gc_timer(State#{ message_gc_pid => proc_lib:spawn_link(fun message_gc_worker/0)})
end;
message_gc_timeout(_Ref, State) ->
State.
message_gc_worker() ->
HighWaterMark = erlang:system_time(microsecond) - emqx_config:get(?msg_retain) * 1000,
message_gc_worker(emqx_persistent_session:first_message_id(), HighWaterMark).
message_gc_worker('$end_of_table', _HighWaterMark) ->
ok;
message_gc_worker(MsgId, HighWaterMark) ->
case emqx_guid:timestamp(MsgId) < HighWaterMark of
true ->
emqx_persistent_session:delete_message(MsgId),
message_gc_worker(emqx_persistent_session:next_message_id(MsgId), HighWaterMark);
false ->
ok
end.

View File

@ -0,0 +1,121 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_mnesia_backend).
-boot_mnesia({mnesia, [boot]}).
-include("emqx.hrl").
-include("emqx_persistent_session.hrl").
-export([ mnesia/1
]).
-export([ first_message_id/0
, next_message_id/1
, delete_message/1
, first_session_message/0
, next_session_message/1
, delete_session_message/1
, put_session_store/1
, delete_session_store/1
, lookup_session_store/1
, put_session_message/1
, put_message/1
, get_message/1
, ro_transaction/1
]).
mnesia(Action) ->
emqx_persistent_session:init_db_backend(),
mnesia_opt(?db_backend =:= ?MODULE, Action).
mnesia_opt(false, _) ->
ok;
mnesia_opt(true, boot) ->
ok = mria:create_table(?SESSION_STORE, [
{type, set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
{record_name, session_store},
{attributes, record_info(fields, session_store)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
ok = mria:create_table(?SESS_MSG_TAB, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
{record_name, session_msg},
{attributes, record_info(fields, session_msg)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = mria:create_table(?MSG_TAB, [
{type, ordered_set},
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
{storage, disc_copies},
{record_name, message},
{attributes, record_info(fields, message)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]).
first_session_message() ->
mnesia:dirty_first(?SESS_MSG_TAB).
next_session_message(Key) ->
mnesia:dirty_next(?SESS_MSG_TAB, Key).
first_message_id() ->
mnesia:dirty_first(?MSG_TAB).
next_message_id(Key) ->
mnesia:dirty_next(?MSG_TAB, Key).
delete_message(Key) ->
mria:dirty_delete(?MSG_TAB, Key).
delete_session_message(Key) ->
mria:dirty_delete(?SESS_MSG_TAB, Key).
put_session_store(SS) ->
mria:dirty_write(?SESSION_STORE, SS).
delete_session_store(ClientID) ->
mria:dirty_delete(?SESSION_STORE, ClientID).
lookup_session_store(ClientID) ->
case mnesia:dirty_read(?SESSION_STORE, ClientID) of
[] -> none;
[SS] -> {value, SS}
end.
put_session_message(SessMsg) ->
mria:dirty_write(?SESS_MSG_TAB, SessMsg).
put_message(Msg) ->
mria:dirty_write(?MSG_TAB, Msg).
get_message(MsgId) ->
case mnesia:read(?MSG_TAB, MsgId) of
[] -> error({msg_not_found, MsgId});
[Msg] -> Msg
end.
ro_transaction(Fun) ->
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
Res.

View File

@ -0,0 +1,60 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
case emqx_persistent_session:is_store_enabled() of
false ->
{ok, {{one_for_all, 0, 1}, []}};
true ->
%% We want this supervisor to own the table for restarts
SessionTab = emqx_session_router:create_init_tab(),
%% Resume worker sup
ResumeSup = #{id => router_worker_sup,
start => {emqx_session_router_worker_sup, start_link, [SessionTab]},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => [emqx_session_router_worker_sup]},
SessionRouterPool = emqx_pool_sup:spec(session_router_pool,
[session_router_pool, hash,
{emqx_session_router, start_link, []}]),
GCWorker = child_spec(emqx_persistent_session_gc, worker),
{ok, {{one_for_all, 0, 1}, [ResumeSup, SessionRouterPool, GCWorker]}}
end.
child_spec(Mod, worker) ->
#{id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 15000,
type => worker,
modules => [Mod]
}.

View File

@ -150,8 +150,30 @@ roots(low) ->
, {"flapping_detect",
sc(ref("flapping_detect"),
#{})}
, {"persistent_session_store",
sc(ref("persistent_session_store"),
#{})}
].
fields("persistent_session_store") ->
[ {"enabled",
sc(boolean(),
#{ default => "false"
})},
{"max_retain_undelivered",
sc(duration(),
#{ default => "1h"
})},
{"message_gc_interval",
sc(duration(),
#{ default => "1h"
})},
{"session_message_gc_interval",
sc(duration(),
#{ default => "1m"
})}
];
fields("stats") ->
[ {"enable",
sc(boolean(),

View File

@ -75,6 +75,7 @@
-export([ deliver/2
, enqueue/2
, dequeue/1
, ignore_local/3
, retry/1
, terminate/3
]).
@ -89,9 +90,17 @@
%% Export for CT
-export([set_field/3]).
-export_type([session/0]).
-type sessionID() :: emqx_guid:guid().
-export_type([ session/0
, sessionID/0
]).
-record(session, {
%% sessionID, fresh for all new sessions unless it is a resumed persistent session
id :: sessionID(),
%% Is this session a persistent session i.e. was it started with Session-Expiry > 0
is_persistent :: boolean(),
%% Clients Subscriptions.
subscriptions :: map(),
%% Max subscriptions allowed
@ -129,7 +138,9 @@
-type(replies() :: list(publish() | pubrel())).
-define(INFO_KEYS, [subscriptions,
-define(INFO_KEYS, [id,
is_persistent,
subscriptions,
upgrade_qos,
retry_interval,
await_rel_timeout,
@ -157,6 +168,7 @@
, await_rel_timeout => timeout()
, max_inflight => integer()
, mqueue => emqx_mqueue:options()
, is_persistent => boolean()
}.
%%--------------------------------------------------------------------
@ -171,6 +183,8 @@ init(Opts) ->
store_qos0 => true
}, maps:get(mqueue, Opts, #{})),
#session{
id = emqx_guid:gen(),
is_persistent = maps:get(is_persistent, Opts, false),
max_subscriptions = maps:get(max_subscriptions, Opts, infinity),
subscriptions = #{},
upgrade_qos = maps:get(upgrade_qos, Opts, false),
@ -195,6 +209,10 @@ info(Session) ->
info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys];
info(id, #session{id = Id}) ->
Id;
info(is_persistent, #session{is_persistent = Bool}) ->
Bool;
info(subscriptions, #session{subscriptions = Subs}) ->
Subs;
info(subscriptions_cnt, #session{subscriptions = Subs}) ->
@ -236,6 +254,23 @@ info(created_at, #session{created_at = CreatedAt}) ->
-spec(stats(session()) -> emqx_types:stats()).
stats(Session) -> info(?STATS_KEYS, Session).
%%--------------------------------------------------------------------
%% Ignore local messages
%%--------------------------------------------------------------------
ignore_local(Delivers, Subscriber, Session) ->
Subs = emqx_session:info(subscriptions, Session),
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
_ ->
false
end
end, Delivers).
%%--------------------------------------------------------------------
%% Client -> Broker: SUBSCRIBE
%%--------------------------------------------------------------------
@ -244,11 +279,12 @@ stats(Session) -> info(?STATS_KEYS, Session).
emqx_types:subopts(), session())
-> {ok, session()} | {error, emqx_types:reason_code()}).
subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
Session = #session{subscriptions = Subs}) ->
Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs}) ->
IsNew = not maps:is_key(TopicFilter, Subs),
case IsNew andalso is_subscriptions_full(Session) of
false ->
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS),
ok = emqx_hooks:run('session.subscribed',
[ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]),
{ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
@ -268,10 +304,12 @@ is_subscriptions_full(#session{subscriptions = Subs,
-spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session())
-> {ok, session()} | {error, emqx_types:reason_code()}).
unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions = Subs}) ->
unsubscribe(ClientInfo, TopicFilter, UnSubOpts,
Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS}) ->
case maps:find(TopicFilter, Subs) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(TopicFilter),
ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
{ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
error ->
@ -638,7 +676,7 @@ terminate(ClientInfo, discarded, Session) ->
run_hook('session.discarded', [ClientInfo, info(Session)]);
terminate(ClientInfo, takeovered, Session) ->
run_hook('session.takeovered', [ClientInfo, info(Session)]);
terminate(ClientInfo, Reason, Session) ->
terminate(#{clientid :=_ClientId} = ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
-compile({inline, [run_hook/2]}).

View File

@ -0,0 +1,276 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_session_router).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([ create_init_tab/0
, start_link/2]).
%% Route APIs
-export([ delete_routes/2
, do_add_route/2
, do_delete_route/2
, match_routes/1
]).
-export([ buffer/3
, pending/2
, resume_begin/2
, resume_end/2
]).
-export([print_routes/1]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-type(group() :: binary()).
-type(dest() :: node() | {group(), node()}).
-define(ROUTE_TAB, emqx_session_route).
-define(SESSION_INIT_TAB, session_init_tab).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = mria:create_table(?ROUTE_TAB, [
{type, bag},
{rlog_shard, ?ROUTE_SHARD},
{storage, disc_copies},
{record_name, route},
{attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]).
%%--------------------------------------------------------------------
%% Start a router
%%--------------------------------------------------------------------
create_init_tab() ->
emqx_tables:new(?SESSION_INIT_TAB, [public, {read_concurrency, true},
{write_concurrency, true}]).
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
%%--------------------------------------------------------------------
%% Route APIs
%%--------------------------------------------------------------------
-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
do_add_route(Topic, SessionID) when is_binary(Topic) ->
Route = #route{topic = Topic, dest = SessionID},
case lists:member(Route, lookup_routes(Topic)) of
true -> ok;
false ->
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:insert_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route],
?PERSISTENT_SESSION_SHARD);
false ->
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
end
end.
%% @doc Match routes
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
match_routes(Topic) when is_binary(Topic) ->
case match_trie(Topic) of
[] -> lookup_routes(Topic);
Matched ->
lists:append([lookup_routes(To) || To <- [Topic | Matched]])
end.
%% Optimize: routing table will be replicated to all router nodes.
match_trie(Topic) ->
case emqx_trie:empty_session() of
true -> [];
false -> emqx_trie:match_session(Topic)
end.
%% Async
delete_routes(SessionID, Subscriptions) ->
cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}).
-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
do_delete_route(Topic, SessionID) ->
Route = #route{topic = Topic, dest = SessionID},
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:delete_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD);
false ->
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
end.
%% @doc Print routes to a topic
-spec(print_routes(emqx_topic:topic()) -> ok).
print_routes(Topic) ->
lists:foreach(fun(#route{topic = To, dest = SessionID}) ->
io:format("~s -> ~p~n", [To, SessionID])
end, match_routes(Topic)).
%%--------------------------------------------------------------------
%% Session APIs
%%--------------------------------------------------------------------
pending(SessionID, MarkerIDs) ->
call(pick(SessionID), {pending, SessionID, MarkerIDs}).
buffer(SessionID, STopic, Msg) ->
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined -> ok;
Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg)
end.
-spec resume_begin(pid(), binary()) -> [{node(), emqx_guid:guid()}].
resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) ->
call(pick(SessionID), {resume_begin, From, SessionID}).
-spec resume_end(pid(), binary()) ->
{'ok', [emqx_types:message()]} | {'error', term()}.
resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) ->
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined ->
?tp(ps_session_not_found, #{ sid => SessionID }),
{error, not_found};
Pid ->
Res = emqx_session_router_worker:resume_end(From, Pid, SessionID),
cast(pick(SessionID), {resume_end, SessionID, Pid}),
Res
end.
%%--------------------------------------------------------------------
%% Worker internals
%%--------------------------------------------------------------------
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).
cast(Router, Msg) ->
gen_server:cast(Router, Msg).
pick(#route{dest = SessionID}) ->
gproc_pool:pick_worker(session_router_pool, SessionID);
pick(SessionID) when is_binary(SessionID) ->
gproc_pool:pick_worker(session_router_pool, SessionID).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id, pmon => emqx_pmon:new()}}.
handle_call({resume_begin, RemotePid, SessionID}, _From, State) ->
case init_resume_worker(RemotePid, SessionID, State) of
error ->
{reply, error, State};
{ok, Pid, State1} ->
ets:insert(?SESSION_INIT_TAB, {SessionID, Pid}),
MarkerID = emqx_persistent_session:mark_resume_begin(SessionID),
{reply, {ok, MarkerID}, State1}
end;
handle_call({pending, SessionID, MarkerIDs}, _From, State) ->
Res = emqx_persistent_session:pending_messages_in_db(SessionID, MarkerIDs),
{reply, Res, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
%% TODO: Make a batch for deleting all routes.
Fun = fun({Topic, _}) -> do_delete_route(Topic, SessionID) end,
ok = lists:foreach(Fun, maps:to_list(Subscriptions)),
{noreply, State};
handle_cast({resume_end, SessionID, Pid}, State) ->
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined -> skip;
P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID);
P when is_pid(P) -> skip
end,
Pmon = emqx_pmon:demonitor(Pid, maps:get(pmon, State)),
_ = emqx_session_router_worker_sup:abort_worker(Pid),
{noreply, State#{ pmon => Pmon }};
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Resume worker. A process that buffers the persisted messages during
%% initialisation of a resuming session.
%%--------------------------------------------------------------------
init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) ->
case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of
{error, What} ->
?SLOG(error, #{msg => "Could not start resume worker", reason => What}),
error;
{ok, Pid} ->
Pmon1 = emqx_pmon:monitor(Pid, Pmon),
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined ->
{ok, Pid, State#{ pmon => Pmon1 }};
{_, OldPid} ->
Pmon2 = emqx_pmon:demonitor(OldPid, Pmon1),
_ = emqx_session_router_worker_sup:abort_worker(OldPid),
{ok, Pid, State#{ pmon => Pmon2 }}
end
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).

View File

@ -0,0 +1,148 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The session router worker is responsible for buffering
%% messages for a persistent session while it is initializing. If a
%% connection process exists for a persistent session, this process is
%% used for bridging the gap while the new connection process takes
%% over the persistent session, but if there is no such process this
%% worker takes it place.
%%
%% The workers are started on all nodes, and buffers all messages that
%% are persisted to the session message table. In the final stage of
%% the initialization, the messages are delivered and the worker is
%% terminated.
-module(emqx_session_router_worker).
-behaviour(gen_server).
%% API
-export([ buffer/3
, pendings/1
, resume_end/3
, start_link/2
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-record(state, { remote_pid :: pid()
, session_id :: binary()
, session_tab :: ets:table()
, messages :: ets:table()
, buffering :: boolean()
}).
%%%===================================================================
%%% API
%%%===================================================================
start_link(SessionTab, #{} = Opts) ->
gen_server:start_link(?MODULE, Opts#{ session_tab => SessionTab}, []).
pendings(Pid) ->
gen_server:call(Pid, pendings).
resume_end(RemotePid, Pid, _SessionID) ->
case gen_server:call(Pid, {resume_end, RemotePid}) of
{ok, EtsHandle} ->
?tp(ps_worker_call_ok, #{ pid => Pid
, remote_pid => RemotePid
, sid => _SessionID}),
{ok, ets:tab2list(EtsHandle)};
{error, _} = Err ->
?tp(ps_worker_call_failed, #{ pid => Pid
, remote_pid => RemotePid
, sid => _SessionID
, reason => Err}),
Err
end.
buffer(Worker, STopic, Msg) ->
Worker ! {buffer, STopic, Msg},
ok.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init(#{ remote_pid := RemotePid
, session_id := SessionID
, session_tab := SessionTab}) ->
process_flag(trap_exit, true),
erlang:monitor(process, RemotePid),
?tp(ps_worker_started, #{ remote_pid => RemotePid
, sid => SessionID }),
{ok, #state{ remote_pid = RemotePid
, session_id = SessionID
, session_tab = SessionTab
, messages = ets:new(?MODULE, [protected, ordered_set])
, buffering = true
}}.
handle_call(pendings, _From, State) ->
%% Debug API
{reply, {State#state.messages, State#state.remote_pid}, State};
handle_call({resume_end, RemotePid}, _From, #state{remote_pid = RemotePid} = State) ->
?tp(ps_worker_resume_end, #{sid => State#state.session_id}),
{reply, {ok, State#state.messages}, State#state{ buffering = false }};
handle_call({resume_end, _RemotePid}, _From, State) ->
?tp(ps_worker_resume_end_error, #{sid => State#state.session_id}),
{reply, {error, wrong_remote_pid}, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({buffer, _STopic, _Msg}, State) when not State#state.buffering ->
?tp(ps_worker_drop_deliver, #{ sid => State#state.session_id
, msg_id => emqx_message:id(_Msg)
}),
{noreply, State};
handle_info({buffer, STopic, Msg}, State) when State#state.buffering ->
?tp(ps_worker_deliver, #{ sid => State#state.session_id
, msg_id => emqx_message:id(Msg)
}),
ets:insert(State#state.messages, {{Msg, STopic}}),
{noreply, State};
handle_info({'DOWN', _, process, RemotePid, _Reason}, #state{remote_pid = RemotePid} = State) ->
?tp(warning, ps_worker, #{ event => worker_remote_died
, sid => State#state.session_id
, msg => "Remote pid died. Exiting." }),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(shutdown, _State) ->
?tp(ps_worker_shutdown, #{ sid => _State#state.session_id }),
ok;
terminate(_, _State) ->
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -0,0 +1,53 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_session_router_worker_sup).
-behaviour(supervisor).
-export([ start_link/1
]).
-export([ abort_worker/1
, start_worker/2
]).
-export([ init/1
]).
start_link(SessionTab) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, SessionTab).
start_worker(SessionID, RemotePid) ->
supervisor:start_child(?MODULE, [#{ session_id => SessionID
, remote_pid => RemotePid}]).
abort_worker(Pid) ->
supervisor:terminate_child(?MODULE, Pid).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init(SessionTab) ->
%% Resume worker
Worker = #{id => session_router_worker,
start => {emqx_session_router_worker, start_link, [SessionTab]},
restart => transient,
shutdown => 2000,
type => worker,
modules => [emqx_session_router_worker]},
{ok, {{simple_one_for_one, 0, 1}, [Worker]}}.

View File

@ -65,9 +65,10 @@ init([]) ->
KernelSup = child_spec(emqx_kernel_sup, supervisor),
RouterSup = child_spec(emqx_router_sup, supervisor),
BrokerSup = child_spec(emqx_broker_sup, supervisor),
SessionSup = child_spec(emqx_persistent_session_sup, supervisor),
CMSup = child_spec(emqx_cm_sup, supervisor),
SysSup = child_spec(emqx_sys_sup, supervisor),
Children = [KernelSup] ++
Children = [KernelSup, SessionSup] ++
[RouterSup || emqx_boot:is_enabled(router)] ++
[BrokerSup || emqx_boot:is_enabled(broker)] ++
[CMSup || emqx_boot:is_enabled(broker)] ++

View File

@ -25,12 +25,17 @@
%% Trie APIs
-export([ insert/1
, insert_session/1
, match/1
, match_session/1
, delete/1
, delete_session/1
]).
-export([ empty/0
, empty_session/0
, lock_tables/0
, lock_session_tables/0
]).
-export([is_compact/0, set_compact/1]).
@ -41,6 +46,7 @@
-endif.
-define(TRIE, emqx_trie).
-define(SESSION_TRIE, emqx_session_trie).
-define(PREFIX(Prefix), {Prefix, 0}).
-define(TOPIC(Topic), {Topic, 1}).
@ -66,7 +72,20 @@ mnesia(boot) ->
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}]).
{storage_properties, StoreProps}]),
case emqx_persistent_session:is_store_enabled() of
true ->
ok = mria:create_table(?SESSION_TRIE, [
{rlog_shard, ?ROUTE_SHARD},
{storage, disc_copies},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}]);
false ->
ok
end.
%%--------------------------------------------------------------------
%% Topics APIs
@ -77,6 +96,10 @@ mnesia(boot) ->
insert(Topic) when is_binary(Topic) ->
insert(Topic, ?TRIE).
-spec(insert_session(emqx_topic:topic()) -> ok).
insert_session(Topic) when is_binary(Topic) ->
insert(Topic, ?SESSION_TRIE).
insert(Topic, Trie) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic),
case mnesia:wread({Trie, TopicKey}) of
@ -89,6 +112,11 @@ insert(Topic, Trie) when is_binary(Topic) ->
delete(Topic) when is_binary(Topic) ->
delete(Topic, ?TRIE).
%% @doc Delete a topic filter from the trie.
-spec(delete_session(emqx_topic:topic()) -> ok).
delete_session(Topic) when is_binary(Topic) ->
delete(Topic, ?SESSION_TRIE).
delete(Topic, Trie) when is_binary(Topic) ->
{TopicKey, PrefixKeys} = make_keys(Topic),
case [] =/= mnesia:wread({Trie, TopicKey}) of
@ -100,6 +128,11 @@ delete(Topic, Trie) when is_binary(Topic) ->
-spec(match(emqx_types:topic()) -> list(emqx_types:topic())).
match(Topic) when is_binary(Topic) ->
match(Topic, ?TRIE).
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
match_session(Topic) when is_binary(Topic) ->
match(Topic, ?SESSION_TRIE).
match(Topic, Trie) when is_binary(Topic) ->
Words = emqx_topic:words(Topic),
case emqx_topic:wildcard(Words) of
@ -120,12 +153,19 @@ match(Topic, Trie) when is_binary(Topic) ->
-spec(empty() -> boolean()).
empty() -> empty(?TRIE).
empty_session() ->
empty(?SESSION_TRIE).
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE).
-spec lock_session_tables() -> ok.
lock_session_tables() ->
mnesia:write_lock_table(?SESSION_TRIE).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -98,6 +98,7 @@ t_open_session(_) ->
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
expiry_interval => 0,
receive_maximum => 100},
{ok, #{session := Session1, present := false}}
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
@ -123,6 +124,7 @@ t_open_session_race_condition(_) ->
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
expiry_interval => 0,
receive_maximum => 100},
Parent = self(),
@ -228,7 +230,7 @@ t_takeover_session(_) ->
end
end),
timer:sleep(100),
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
{living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
emqx_cm:unregister_channel(<<"clientid">>).
t_kick_session(_) ->

View File

@ -131,7 +131,6 @@ clean_retained(Topic, Config) ->
t_basic_test(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(C),
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
@ -333,37 +332,6 @@ t_connect_keepalive_timeout(Config) ->
error("keepalive timeout")
end.
%% [MQTT-3.1.2-23]
t_connect_session_expiry_interval(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS),
Payload = "test message",
{ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7200}}
| Config
]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
ok = emqtt:disconnect(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(Client2),
{ok, 2} = emqtt:publish(Client2, Topic, Payload, 2),
ok = emqtt:disconnect(Client2),
{ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
{proto_ver, v5},
{clean_start, false} | Config
]),
{ok, _} = emqtt:ConnFun(Client3),
[Msg | _ ] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
?assertEqual({ok, 2}, maps:find(qos, Msg)),
ok = emqtt:disconnect(Client3).
%% [MQTT-3.1.3-9]
%% !!!REFACTOR NEED:
%t_connect_will_delay_interval(Config) ->

View File

@ -0,0 +1,989 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_SUITE).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("../include/emqx.hrl").
-include("../src/emqx_persistent_session.hrl").
-compile(export_all).
-compile(nowarn_export_all).
%%--------------------------------------------------------------------
%% SUITE boilerplate
%%--------------------------------------------------------------------
all() ->
[ {group, persistent_store_enabled}
, {group, persistent_store_disabled}
].
%% A persistent session can be resumed in two ways:
%% 1. The old connection process is still alive, and the session is taken
%% over by the new connection.
%% 2. The old session process has died (e.g., because of node down).
%% The new process resumes the session from the stored state, and finds
%% any subscribed messages from the persistent message store.
%%
%% We want to test both ways, both with the db backend enabled and disabled.
%%
%% In addition, we test both tcp and quic connections.
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
GCTests = [TC || TC <- TCs, is_gc_tc(TC)],
OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
[ {persistent_store_enabled, [ {group, no_kill_connection_process}
, {group, kill_connection_process}
, {group, snabbkaffe}
, {group, gc_tests}
]}
, {persistent_store_disabled, [ {group, no_kill_connection_process}
]}
, {no_kill_connection_process, [], [{group, tcp}, {group, quic}]}
, { kill_connection_process, [], [{group, tcp}, {group, quic}]}
, {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}]}
, {tcp, [], OtherTCs}
, {quic, [], OtherTCs}
, {tcp_snabbkaffe, [], SnabbkaffeTCs}
, {quic_snabbkaffe, [], SnabbkaffeTCs}
, {gc_tests, [], GCTests}
].
is_snabbkaffe_tc(TC) ->
re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch.
is_gc_tc(TC) ->
re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
init_per_group(persistent_store_enabled, Config) ->
%% Start Apps
emqx_common_test_helpers:boot_modules(all),
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_config, get, fun(?is_enabled_key) -> true;
(Other) -> meck:passthrough([Other])
end),
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
Config;
init_per_group(persistent_store_disabled, Config) ->
%% Start Apps
emqx_common_test_helpers:boot_modules(all),
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_config, get, fun(?is_enabled_key) -> false;
(Other) -> meck:passthrough([Other])
end),
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
Config;
init_per_group(Group, Config) when Group == tcp; Group == tcp_snabbkaffe ->
[ {port, 1883}, {conn_fun, connect}| Config];
init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe ->
[ {port, 14567}, {conn_fun, quic_connect} | Config];
init_per_group(no_kill_connection_process, Config) ->
[ {kill_connection_process, false} | Config];
init_per_group(kill_connection_process, Config) ->
[ {kill_connection_process, true} | Config];
init_per_group(snabbkaffe, Config) ->
[ {kill_connection_process, true} | Config];
init_per_group(gc_tests, Config) ->
%% We need to make sure the system does not interfere with this test group.
emqx_common_test_helpers:stop_apps([]),
SessionMsgEts = gc_tests_session_store,
MsgEts = gc_tests_msg_store,
Pid = spawn(fun() ->
ets:new(SessionMsgEts, [named_table, public, ordered_set]),
ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]),
receive stop -> ok end
end),
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
(?MSG_TAB) -> ets:first(MsgEts);
(X) -> meck:passthrough(X)
end),
meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
(?MSG_TAB, X) -> ets:next(MsgEts, X);
(Tab, X) -> meck:passthrough([Tab, X])
end),
meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X);
(Tab, X) -> meck:passthrough([Tab, X])
end),
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
init_per_suite(Config) ->
Config.
set_special_confs(emqx) ->
Path = emqx_common_test_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"),
application:set_env(emqx, plugins_loaded_file, Path);
set_special_confs(_) ->
ok.
end_per_suite(_Config) ->
ok.
end_per_group(gc_tests, Config) ->
meck:unload(mnesia),
?config(store_owner, Config) ! stop,
ok;
end_per_group(persistent_store_enabled, _Config) ->
meck:unload(emqx_config),
emqx_common_test_helpers:stop_apps([]);
end_per_group(persistent_store_disabled, _Config) ->
meck:unload(emqx_config),
emqx_common_test_helpers:stop_apps([]);
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) ->
Config1 = preconfig_per_testcase(TestCase, Config),
case is_gc_tc(TestCase) of
true ->
ets:delete_all_objects(?config(msg_store, Config)),
ets:delete_all_objects(?config(session_msg_store, Config));
false ->
skip
end,
case erlang:function_exported(?MODULE, TestCase, 2) of
true -> ?MODULE:TestCase(init, Config1);
_ -> Config1
end.
end_per_testcase(TestCase, Config) ->
case is_snabbkaffe_tc(TestCase) of
true -> snabbkaffe:stop();
false -> skip
end,
case erlang:function_exported(?MODULE, TestCase, 2) of
true -> ?MODULE:TestCase('end', Config);
false -> ok
end,
Config.
preconfig_per_testcase(TestCase, Config) ->
{BaseName, Config1} =
case ?config(tc_group_properties, Config) of
[] ->
%% We are running a single testcase
{atom_to_binary(TestCase),
init_per_group(tcp, init_per_group(kill_connection_process, Config))};
[_|_] = Props->
Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
{iolist_to_binary(Pre1),
Config}
end,
[ {topic, iolist_to_binary([BaseName, "/foo"])}
, {stopic, iolist_to_binary([BaseName, "/+"])}
, {stopic_alt, iolist_to_binary([BaseName, "/foo"])}
, {client_id, BaseName}
| Config1].
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
client_info(Key, Client) ->
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
Msgs;
receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
_Other ->
receive_messages(Count, Msgs)
after 1000 ->
Msgs
end.
maybe_kill_connection_process(ClientId, Config) ->
case ?config(kill_connection_process, Config) of
true ->
[ConnectionPid] = emqx_cm:lookup_channels(ClientId),
?assert(is_pid(ConnectionPid)),
ConnectionPid ! die_if_test,
ok;
false ->
ok
end.
snabbkaffe_sync_publish(Topic, Payloads, Config) ->
Fun = fun(Client, Payload) ->
?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
, #{?snk_kind := ps_persist_msg, payload := Payload}
)
end,
do_publish(Payloads, Fun, Config).
publish(Topic, Payloads, Config) ->
Fun = fun(Client, Payload) ->
{ok, _} = emqtt:publish(Client, Topic, Payload, 2)
end,
do_publish(Payloads, Fun, Config).
do_publish(Payloads = [_|_], PublishFun, Config) ->
%% Publish from another process to avoid connection confusion.
{Pid, Ref} =
spawn_monitor(
fun() ->
ConnFun = ?config(conn_fun, Config),
{ok, Client} = emqtt:start_link([ {proto_ver, v5}
| Config]),
{ok, _} = emqtt:ConnFun(Client),
lists:foreach(fun(Payload) -> PublishFun(Client, Payload)end, Payloads),
ok = emqtt:disconnect(Client)
end),
receive
{'DOWN', Ref, process, Pid, normal} -> ok;
{'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
end;
do_publish(Payload, PublishFun, Config) ->
do_publish([Payload], PublishFun, Config).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
%% [MQTT-3.1.2-23]
t_connect_session_expiry_interval(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
Payload = <<"test message">>,
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payload, Config),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
[Msg | _ ] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
?assertEqual({ok, 2}, maps:find(qos, Msg)),
ok = emqtt:disconnect(Client2).
t_without_client_id(Config) ->
process_flag(trap_exit, true), %% Emqtt client dies
ConnFun = ?config(conn_fun, Config),
{ok, Client0} = emqtt:start_link([ {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config]),
{error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
ok.
t_assigned_clientid_persistent_session(Config) ->
ConnFun = ?config(conn_fun, Config),
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
AssignedClientId = client_info(clientid, Client1),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(AssignedClientId, Config),
{ok, Client2} = emqtt:start_link([ {clientid, AssignedClientId},
{proto_ver, v5},
{clean_start, false}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
?assertEqual(1, client_info(session_present, Client2)),
ok = emqtt:disconnect(Client2).
t_cancel_on_disconnect(Config) ->
%% Open a persistent session, but cancel the persistence when
%% shutting down the connection.
ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
?assertEqual(0, client_info(session_present, Client2)),
ok = emqtt:disconnect(Client2).
t_persist_on_disconnect(Config) ->
%% Open a non-persistent session, but add the persistence when
%% shutting down the connection. This is a protocol error, and
%% should not convert the session into a persistent session.
ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 0}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
%% Strangely enough, the disconnect is reported as successful by emqtt.
ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
%% The session should not be known, since it wasn't persisted because of the
%% changed expiry interval in the disconnect call.
?assertEqual(0, client_info(session_present, Client2)),
ok = emqtt:disconnect(Client2).
t_process_dies_session_expires(Config) ->
%% Emulate an error in the connect process,
%% or that the node of the process goes down.
%% A persistent session should eventually expire.
ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 1}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
timer:sleep(1100),
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{clean_start, false}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
?assertEqual(0, client_info(session_present, Client2)),
emqtt:disconnect(Client2).
t_publish_while_client_is_gone(Config) ->
%% A persistent session should receive messages in its
%% subscription even if the process owning the session dies.
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
Payload1 = <<"hello1">>,
Payload2 = <<"hello2">>,
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
ok = publish(Topic, [Payload1, Payload2], Config),
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
[Msg1] = receive_messages(1),
[Msg2] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
ok = emqtt:disconnect(Client2).
t_clean_start_drops_subscriptions(Config) ->
%% 1. A persistent session is started and disconnected.
%% 2. While disconnected, a message is published and persisted.
%% 3. When connecting again, the clean start flag is set, the subscription is renewed,
%% then we disconnect again.
%% 4. Finally, a new connection is made with clean start set to false.
%% The original message should not be delivered.
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
Payload1 = <<"hello1">>,
Payload2 = <<"hello2">>,
Payload3 = <<"hello3">>,
ClientId = ?config(client_id, Config),
%% 1.
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
%% 2.
ok = publish(Topic, Payload1, Config),
%% 3.
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, true}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
?assertEqual(0, client_info(session_present, Client2)),
{ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
ok = publish(Topic, Payload2, Config),
[Msg1] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
ok = emqtt:disconnect(Client2),
maybe_kill_connection_process(ClientId, Config),
%% 4.
{ok, Client3} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config]),
{ok, _} = emqtt:ConnFun(Client3),
ok = publish(Topic, Payload3, Config),
[Msg2] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
ok = emqtt:disconnect(Client3).
t_unsubscribe(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
ClientId = ?config(client_id, Config),
{ok, Client} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client),
{ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
case emqx_persistent_session:is_store_enabled() of
true ->
[Session] = emqx_persistent_session:lookup(ClientId),
SessionID = emqx_session:info(id, Session),
SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
?assert(lists:member(SessionID, SessionIDs)),
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
?assert(not lists:member(SessionID, SessionIDs2));
false ->
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic])
end,
ok = emqtt:disconnect(Client).
t_multiple_subscription_matches(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
STopic1 = ?config(stopic, Config),
STopic2 = ?config(stopic_alt, Config),
Payload = <<"test message">>,
ClientId = ?config(client_id, Config),
{ok, Client1} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payload, Config),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
%% We will receive the same message twice because it matches two subscriptions.
[Msg1, Msg2] = receive_messages(2),
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
ok = emqtt:disconnect(Client2).
t_lost_messages_because_of_gc(init, Config) ->
case (emqx_persistent_session:is_store_enabled()
andalso ?config(kill_connection_process, Config)) of
true ->
Retain = 1000,
OldRetain = emqx_config:get(?msg_retain, Retain),
emqx_config:put(?msg_retain, Retain),
[{retain, Retain}, {old_retain, OldRetain}|Config];
false -> {skip, only_relevant_with_store}
end;
t_lost_messages_because_of_gc('end', Config) ->
OldRetain = ?config(old_retain, Config),
emqx_config:put(?msg_retain, OldRetain),
ok.
t_lost_messages_because_of_gc(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
ClientId = ?config(client_id, Config),
Retain = ?config(retain, Config),
Payload1 = <<"hello1">>,
Payload2 = <<"hello2">>,
{ok, Client1} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payload1, Config),
timer:sleep(2 * Retain),
publish(Topic, Payload2, Config),
emqx_persistent_session_gc:message_gc_worker(),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{clean_start, false},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config]),
{ok, _} = emqtt:ConnFun(Client2),
Msgs = receive_messages(2),
?assertMatch([_], Msgs),
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))),
emqtt:disconnect(Client2),
ok.
%%--------------------------------------------------------------------
%% Snabbkaffe helpers
%%--------------------------------------------------------------------
check_snabbkaffe_vanilla(Trace) ->
ResumeTrace = [T || #{?snk_kind := K} = T <- Trace,
re:run(atom_to_list(K), "^ps_") /= nomatch],
?assertMatch([_|_], ResumeTrace),
[_Sid] = lists:usort(?projection(sid, ResumeTrace)),
%% Check internal flow of the emqx_cm resuming
?assert(?strict_causality(#{ ?snk_kind := ps_resuming },
#{ ?snk_kind := ps_initial_pendings },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_initial_pendings },
#{ ?snk_kind := ps_persist_pendings },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_persist_pendings },
#{ ?snk_kind := ps_notify_writers },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers },
#{ ?snk_kind := ps_node_markers },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_node_markers },
#{ ?snk_kind := ps_resume_session },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_resume_session },
#{ ?snk_kind := ps_marker_pendings },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings },
#{ ?snk_kind := ps_marker_pendings_msgs },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings_msgs },
#{ ?snk_kind := ps_resume_end },
ResumeTrace)),
%% Check flow between worker and emqx_cm
?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers },
#{ ?snk_kind := ps_worker_started },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings },
#{ ?snk_kind := ps_worker_resume_end },
ResumeTrace)),
?assert(?strict_causality(#{ ?snk_kind := ps_worker_resume_end },
#{ ?snk_kind := ps_worker_shutdown },
ResumeTrace)),
[Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)),
?assertMatch([_], Markers).
%%--------------------------------------------------------------------
%% Snabbkaffe tests
%%--------------------------------------------------------------------
t_snabbkaffe_vanilla_stages(Config) ->
%% Test that all stages of session resume works ok in the simplest case
process_flag(trap_exit, true),
ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config),
EmqttOpts = [ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config],
{ok, Client1} = emqtt:start_link([{clean_start, true}|EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client1),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
?check_trace(
begin
{ok, Client2} = emqtt:start_link([{clean_start, false}|EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client2),
ok = emqtt:disconnect(Client2)
end,
fun(ok, Trace) ->
check_snabbkaffe_vanilla(Trace)
end),
ok.
t_snabbkaffe_pending_messages(Config) ->
%% Make sure there are pending messages are fetched during the init stage.
process_flag(trap_exit, true),
ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1,2,3,4,5]],
EmqttOpts = [ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config],
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
?check_trace(
begin
snabbkaffe_sync_publish(Topic, Payloads, Config),
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client2),
Msgs = receive_messages(length(Payloads)),
ReceivedPayloads = [P || #{ payload := P } <- Msgs],
?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)),
ok = emqtt:disconnect(Client2)
end,
fun(ok, Trace) ->
check_snabbkaffe_vanilla(Trace),
%% Check that all messages was delivered from the DB
[Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)),
[Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)),
Delivers = Delivers1 ++ Delivers2,
?assertEqual(length(Payloads), length(Delivers)),
%% Check for no duplicates
?assertEqual(lists:usort(Delivers), lists:sort(Delivers))
end),
ok.
t_snabbkaffe_buffered_messages(Config) ->
%% Make sure to buffer messages during startup.
process_flag(trap_exit, true),
ConnFun = ?config(conn_fun, Config),
ClientId = ?config(client_id, Config),
Topic = ?config(topic, Config),
STopic = ?config(stopic, Config),
Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]],
Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]],
EmqttOpts = [ {proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config],
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payloads1, Config),
?check_trace(
begin
%% Make the resume init phase wait until the first message is delivered.
?force_ordering( #{ ?snk_kind := ps_worker_deliver },
#{ ?snk_kind := ps_resume_end }),
spawn_link(fun() ->
?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000),
publish(Topic, Payloads2, Config)
end),
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client2),
Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1),
ReceivedPayloads = [P || #{ payload := P } <- Msgs],
?assertEqual(lists:sort(Payloads1 ++ Payloads2),
lists:sort(ReceivedPayloads)),
ok = emqtt:disconnect(Client2)
end,
fun(ok, Trace) ->
check_snabbkaffe_vanilla(Trace),
%% Check that some messages was buffered in the writer process
[Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)),
?assertMatch(X when 0 < X andalso X =< length(Payloads2),
length(Msgs))
end),
ok.
%%--------------------------------------------------------------------
%% GC tests
%%--------------------------------------------------------------------
-define(MARKER, 3).
-define(DELIVERED, 2).
-define(UNDELIVERED, 1).
-define(ABANDONED, 0).
msg_id() ->
emqx_guid:gen().
delivered_msg(MsgId, SessionID, STopic) ->
{SessionID, MsgId, STopic, ?DELIVERED}.
undelivered_msg(MsgId, SessionID, STopic) ->
{SessionID, MsgId, STopic, ?UNDELIVERED}.
marker_msg(MarkerID, SessionID) ->
{SessionID, MarkerID, <<>>, ?MARKER}.
guid(MicrosecondsAgo) ->
%% Make a fake GUID and set a timestamp.
<< TS:64, Tail:64 >> = emqx_guid:gen(),
<< (TS - MicrosecondsAgo) : 64, Tail:64 >>.
abandoned_session_msg(SessionID) ->
abandoned_session_msg(SessionID, 0).
abandoned_session_msg(SessionID, MicrosecondsAgo) ->
TS = erlang:system_time(microsecond),
{SessionID, <<>>, <<(TS - MicrosecondsAgo) : 64>>, ?ABANDONED}.
fresh_gc_delete_fun() ->
Ets = ets:new(gc_collect, [ordered_set]),
fun(delete, Key) -> ets:insert(Ets, {Key}), ok;
(collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List);
(_, _Key) -> ok
end.
fresh_gc_callbacks_fun() ->
Ets = ets:new(gc_collect, [ordered_set]),
fun(collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List);
(Tag, Key) -> ets:insert(Ets, {{Key, Tag}}), ok
end.
get_gc_delete_messages() ->
Fun = fresh_gc_delete_fun(),
emqx_persistent_session:gc_session_messages(Fun),
Fun(collect, <<>>).
get_gc_callbacks() ->
Fun = fresh_gc_callbacks_fun(),
emqx_persistent_session:gc_session_messages(Fun),
Fun(collect, <<>>).
t_gc_all_delivered(Config) ->
Store = ?config(session_msg_store, Config),
STopic = ?config(stopic, Config),
SessionId = emqx_guid:gen(),
MsgIds = [msg_id() || _ <- lists:seq(1, 5)],
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
SortedContent = lists:usort(Delivered ++ Undelivered),
ets:insert(Store, [{X, <<>>} || X <- SortedContent]),
GCMessages = get_gc_delete_messages(),
?assertEqual(SortedContent, GCMessages),
ok.
t_gc_some_undelivered(Config) ->
Store = ?config(session_msg_store, Config),
STopic = ?config(stopic, Config),
SessionId = emqx_guid:gen(),
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
{Delivered1,_Delivered2} = split(Delivered),
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
{Undelivered1, Undelivered2} = split(Undelivered),
Content = Delivered1 ++ Undelivered1 ++ Undelivered2,
ets:insert(Store, [{X, <<>>} || X <- Content]),
Expected = lists:usort(Delivered1 ++ Undelivered1),
GCMessages = get_gc_delete_messages(),
?assertEqual(Expected, GCMessages),
ok.
t_gc_with_markers(Config) ->
Store = ?config(session_msg_store, Config),
STopic = ?config(stopic, Config),
SessionId = emqx_guid:gen(),
MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)],
MarkerId = msg_id(),
MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1,
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
{Delivered1,_Delivered2} = split(Delivered),
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
{Undelivered1, Undelivered2} = split(Undelivered),
Markers = [marker_msg(MarkerId, SessionId)],
Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers,
ets:insert(Store, [{X, <<>>} || X <- Content]),
Expected = lists:usort(Delivered1 ++ Undelivered1),
GCMessages = get_gc_delete_messages(),
?assertEqual(Expected, GCMessages),
ok.
t_gc_abandoned_some_undelivered(Config) ->
Store = ?config(session_msg_store, Config),
STopic = ?config(stopic, Config),
SessionId = emqx_guid:gen(),
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
{Delivered1,_Delivered2} = split(Delivered),
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
{Undelivered1, Undelivered2} = split(Undelivered),
Abandoned = abandoned_session_msg(SessionId),
Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned],
ets:insert(Store, [{X, <<>>} || X <- Content]),
Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2),
GCMessages = get_gc_delete_messages(),
?assertEqual(Expected, GCMessages),
ok.
t_gc_abandoned_only_called_on_empty_session(Config) ->
Store = ?config(session_msg_store, Config),
STopic = ?config(stopic, Config),
SessionId = emqx_guid:gen(),
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
Abandoned = abandoned_session_msg(SessionId),
Content = Delivered ++ Undelivered ++ [Abandoned],
ets:insert(Store, [{X, <<>>} || X <- Content]),
GCMessages = get_gc_callbacks(),
%% Since we had messages to delete, we don't expect to get the
%% callback on the abandoned session
?assertEqual([], [ X || {X, abandoned} <- GCMessages]),
%% But if we have only the abandoned session marker for this
%% session, it should be called.
ets:delete_all_objects(Store),
UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>),
ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]),
GCMessages2 = get_gc_callbacks(),
?assertEqual([Abandoned], [ X || {X, abandoned} <- GCMessages2]),
ok.
t_gc_session_gc_worker(init, Config) ->
meck:new(emqx_persistent_session, [passthrough, no_link]),
Config;
t_gc_session_gc_worker('end',_Config) ->
meck:unload(emqx_persistent_session),
ok.
t_gc_session_gc_worker(Config) ->
STopic = ?config(stopic, Config),
SessionID = emqx_guid:gen(),
MsgDeleted = delivered_msg(msg_id(), SessionID, STopic),
MarkerNotDeleted = marker_msg(msg_id(), SessionID),
MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID),
AbandonedNotDeleted = abandoned_session_msg(SessionID),
AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000),
meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end),
emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted),
emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted),
emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted),
emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted),
emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted),
History = meck:history(emqx_persistent_session, self()),
DeleteCalls = [ Key || {_Pid, {_, delete_session_message, [Key]}, _Result}
<- History],
?assertEqual(lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]),
lists:sort(DeleteCalls)),
ok.
t_gc_message_gc(Config) ->
Topic = ?config(topic, Config),
ClientID = ?config(client_id, Config),
Store = ?config(msg_store, Config),
NewMsgs = [emqx_message:make(ClientID, Topic, integer_to_binary(P))
|| P <- lists:seq(6, 10)],
Retain = 60 * 1000,
emqx_config:put(?msg_retain, Retain),
Msgs1 = [emqx_message:make(ClientID, Topic, integer_to_binary(P))
|| P <- lists:seq(1, 5)],
OldMsgs = [M#message{id = guid(Retain*1000)} || M <- Msgs1],
ets:insert(Store, NewMsgs ++ OldMsgs),
?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)),
ok = emqx_persistent_session_gc:message_gc_worker(),
?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)),
ok.
split(List) ->
split(List, [], []).
split([], L1, L2) ->
{L1, L2};
split([H], L1, L2) ->
{[H|L1], L2};
split([H1, H2|Left], L1, L2) ->
split(Left, [H1|L1], [H2|L2]).