chore(emqx): drop remnants of former session persistence impl
1. It is not functional anyway. 2. It blocks `emqx_session` refactoring in a few places.
This commit is contained in:
parent
2965fa6fcb
commit
cfb1bf1fa4
|
@ -46,7 +46,6 @@
|
||||||
{emqx_node_rebalance_purge,1}.
|
{emqx_node_rebalance_purge,1}.
|
||||||
{emqx_node_rebalance_status,1}.
|
{emqx_node_rebalance_status,1}.
|
||||||
{emqx_node_rebalance_status,2}.
|
{emqx_node_rebalance_status,2}.
|
||||||
{emqx_persistent_session,1}.
|
|
||||||
{emqx_persistent_session_ds,1}.
|
{emqx_persistent_session_ds,1}.
|
||||||
{emqx_plugins,1}.
|
{emqx_plugins,1}.
|
||||||
{emqx_prometheus,1}.
|
{emqx_prometheus,1}.
|
||||||
|
|
|
@ -38,7 +38,6 @@
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
ok = maybe_load_config(),
|
ok = maybe_load_config(),
|
||||||
ok = emqx_persistent_session:init_db_backend(),
|
|
||||||
_ = emqx_persistent_session_ds:init(),
|
_ = emqx_persistent_session_ds:init(),
|
||||||
ok = maybe_start_quicer(),
|
ok = maybe_start_quicer(),
|
||||||
ok = emqx_bpapi:start(),
|
ok = emqx_bpapi:start(),
|
||||||
|
|
|
@ -1,306 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_session_router).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
-include("logger.hrl").
|
|
||||||
-include("types.hrl").
|
|
||||||
-include("persistent_session/emqx_persistent_session.hrl").
|
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
|
|
||||||
-export([
|
|
||||||
create_init_tab/0,
|
|
||||||
create_router_tab/1,
|
|
||||||
start_link/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Route APIs
|
|
||||||
-export([
|
|
||||||
delete_routes/2,
|
|
||||||
do_add_route/2,
|
|
||||||
do_delete_route/2,
|
|
||||||
match_routes/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
buffer/3,
|
|
||||||
pending/2,
|
|
||||||
resume_begin/2,
|
|
||||||
resume_end/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([print_routes/1]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([
|
|
||||||
init/1,
|
|
||||||
handle_call/3,
|
|
||||||
handle_cast/2,
|
|
||||||
handle_info/2,
|
|
||||||
terminate/2,
|
|
||||||
code_change/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-type dest() :: node() | {emqx_types:group(), node()}.
|
|
||||||
|
|
||||||
-define(ROUTE_RAM_TAB, emqx_session_route_ram).
|
|
||||||
-define(ROUTE_DISC_TAB, emqx_session_route_disc).
|
|
||||||
|
|
||||||
-define(SESSION_INIT_TAB, session_init_tab).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Mnesia bootstrap
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
create_router_tab(disc) ->
|
|
||||||
create_table(?ROUTE_DISC_TAB, disc_copies);
|
|
||||||
create_router_tab(ram) ->
|
|
||||||
create_table(?ROUTE_RAM_TAB, ram_copies).
|
|
||||||
|
|
||||||
create_table(Tab, Storage) ->
|
|
||||||
ok = mria:create_table(Tab, [
|
|
||||||
{type, bag},
|
|
||||||
{rlog_shard, ?ROUTE_SHARD},
|
|
||||||
{storage, Storage},
|
|
||||||
{record_name, route},
|
|
||||||
{attributes, record_info(fields, route)},
|
|
||||||
{storage_properties, [
|
|
||||||
{ets, [
|
|
||||||
{read_concurrency, true},
|
|
||||||
{write_concurrency, true}
|
|
||||||
]}
|
|
||||||
]}
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Start a router
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
create_init_tab() ->
|
|
||||||
emqx_utils_ets:new(?SESSION_INIT_TAB, [
|
|
||||||
public,
|
|
||||||
{read_concurrency, true},
|
|
||||||
{write_concurrency, true}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-spec start_link(atom(), pos_integer()) -> startlink_ret().
|
|
||||||
start_link(Pool, Id) ->
|
|
||||||
gen_server:start_link(
|
|
||||||
{local, emqx_utils:proc_name(?MODULE, Id)},
|
|
||||||
?MODULE,
|
|
||||||
[Pool, Id],
|
|
||||||
[{hibernate_after, 1000}]
|
|
||||||
).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Route APIs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
|
||||||
do_add_route(Topic, SessionID) when is_binary(Topic) ->
|
|
||||||
Route = #route{topic = Topic, dest = SessionID},
|
|
||||||
case lists:member(Route, lookup_routes(Topic)) of
|
|
||||||
true ->
|
|
||||||
ok;
|
|
||||||
false ->
|
|
||||||
case emqx_topic:wildcard(Topic) of
|
|
||||||
true ->
|
|
||||||
Fun = fun emqx_router_utils:insert_session_trie_route/2,
|
|
||||||
emqx_router_utils:maybe_trans(
|
|
||||||
Fun,
|
|
||||||
[route_tab(), Route],
|
|
||||||
?PERSISTENT_SESSION_SHARD
|
|
||||||
);
|
|
||||||
false ->
|
|
||||||
emqx_router_utils:insert_direct_route(route_tab(), Route)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Match routes
|
|
||||||
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
|
||||||
match_routes(Topic) when is_binary(Topic) ->
|
|
||||||
case match_trie(Topic) of
|
|
||||||
[] -> lookup_routes(Topic);
|
|
||||||
Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]])
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Optimize: routing table will be replicated to all router nodes.
|
|
||||||
match_trie(Topic) ->
|
|
||||||
case emqx_trie:empty_session() of
|
|
||||||
true -> [];
|
|
||||||
false -> emqx_trie:match_session(Topic)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Async
|
|
||||||
delete_routes(SessionID, Subscriptions) ->
|
|
||||||
cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}).
|
|
||||||
|
|
||||||
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
|
||||||
do_delete_route(Topic, SessionID) ->
|
|
||||||
Route = #route{topic = Topic, dest = SessionID},
|
|
||||||
case emqx_topic:wildcard(Topic) of
|
|
||||||
true ->
|
|
||||||
Fun = fun emqx_router_utils:delete_session_trie_route/2,
|
|
||||||
emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD);
|
|
||||||
false ->
|
|
||||||
emqx_router_utils:delete_direct_route(route_tab(), Route)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Print routes to a topic
|
|
||||||
-spec print_routes(emqx_types:topic()) -> ok.
|
|
||||||
print_routes(Topic) ->
|
|
||||||
lists:foreach(
|
|
||||||
fun(#route{topic = To, dest = SessionID}) ->
|
|
||||||
io:format("~s -> ~p~n", [To, SessionID])
|
|
||||||
end,
|
|
||||||
match_routes(Topic)
|
|
||||||
).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Session APIs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
pending(SessionID, MarkerIDs) ->
|
|
||||||
call(pick(SessionID), {pending, SessionID, MarkerIDs}).
|
|
||||||
|
|
||||||
buffer(SessionID, STopic, Msg) ->
|
|
||||||
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
|
|
||||||
undefined -> ok;
|
|
||||||
Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec resume_begin(pid(), binary()) -> [{node(), emqx_guid:guid()}].
|
|
||||||
resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) ->
|
|
||||||
call(pick(SessionID), {resume_begin, From, SessionID}).
|
|
||||||
|
|
||||||
-spec resume_end(pid(), binary()) ->
|
|
||||||
{'ok', [emqx_types:message()]} | {'error', term()}.
|
|
||||||
resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) ->
|
|
||||||
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
|
|
||||||
undefined ->
|
|
||||||
?tp(ps_session_not_found, #{sid => SessionID}),
|
|
||||||
{error, not_found};
|
|
||||||
Pid ->
|
|
||||||
Res = emqx_session_router_worker:resume_end(From, Pid, SessionID),
|
|
||||||
cast(pick(SessionID), {resume_end, SessionID, Pid}),
|
|
||||||
Res
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Worker internals
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
call(Router, Msg) ->
|
|
||||||
gen_server:call(Router, Msg, infinity).
|
|
||||||
|
|
||||||
cast(Router, Msg) ->
|
|
||||||
gen_server:cast(Router, Msg).
|
|
||||||
|
|
||||||
pick(#route{dest = SessionID}) ->
|
|
||||||
gproc_pool:pick_worker(session_router_pool, SessionID);
|
|
||||||
pick(SessionID) when is_binary(SessionID) ->
|
|
||||||
gproc_pool:pick_worker(session_router_pool, SessionID).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% gen_server callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init([Pool, Id]) ->
|
|
||||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
||||||
{ok, #{pool => Pool, id => Id, pmon => emqx_pmon:new()}}.
|
|
||||||
|
|
||||||
handle_call({resume_begin, RemotePid, SessionID}, _From, State) ->
|
|
||||||
case init_resume_worker(RemotePid, SessionID, State) of
|
|
||||||
error ->
|
|
||||||
{reply, error, State};
|
|
||||||
{ok, Pid, State1} ->
|
|
||||||
ets:insert(?SESSION_INIT_TAB, {SessionID, Pid}),
|
|
||||||
MarkerID = emqx_persistent_session:mark_resume_begin(SessionID),
|
|
||||||
{reply, {ok, MarkerID}, State1}
|
|
||||||
end;
|
|
||||||
handle_call({pending, SessionID, MarkerIDs}, _From, State) ->
|
|
||||||
Res = emqx_persistent_session:pending_messages_in_db(SessionID, MarkerIDs),
|
|
||||||
{reply, Res, State};
|
|
||||||
handle_call(Req, _From, State) ->
|
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
|
||||||
{reply, ignored, State}.
|
|
||||||
|
|
||||||
handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
|
|
||||||
%% TODO: Make a batch for deleting all routes.
|
|
||||||
Fun = fun(Topic, _) -> do_delete_route(Topic, SessionID) end,
|
|
||||||
ok = maps:foreach(Fun, Subscriptions),
|
|
||||||
{noreply, State};
|
|
||||||
handle_cast({resume_end, SessionID, Pid}, State) ->
|
|
||||||
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
|
|
||||||
undefined -> skip;
|
|
||||||
P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID);
|
|
||||||
P when is_pid(P) -> skip
|
|
||||||
end,
|
|
||||||
Pmon = emqx_pmon:demonitor(Pid, maps:get(pmon, State)),
|
|
||||||
_ = emqx_session_router_worker_sup:abort_worker(Pid),
|
|
||||||
{noreply, State#{pmon => Pmon}};
|
|
||||||
handle_cast(Msg, State) ->
|
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
|
||||||
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Resume worker. A process that buffers the persisted messages during
|
|
||||||
%% initialisation of a resuming session.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init_resume_worker(RemotePid, SessionID, #{pmon := Pmon} = State) ->
|
|
||||||
case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of
|
|
||||||
{error, What} ->
|
|
||||||
?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}),
|
|
||||||
error;
|
|
||||||
{ok, Pid} ->
|
|
||||||
Pmon1 = emqx_pmon:monitor(Pid, Pmon),
|
|
||||||
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
|
|
||||||
undefined ->
|
|
||||||
{ok, Pid, State#{pmon => Pmon1}};
|
|
||||||
{_, OldPid} ->
|
|
||||||
Pmon2 = emqx_pmon:demonitor(OldPid, Pmon1),
|
|
||||||
_ = emqx_session_router_worker_sup:abort_worker(OldPid),
|
|
||||||
{ok, Pid, State#{pmon => Pmon2}}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
lookup_routes(Topic) ->
|
|
||||||
ets:lookup(route_tab(), Topic).
|
|
||||||
|
|
||||||
route_tab() ->
|
|
||||||
case emqx_persistent_session:storage_type() of
|
|
||||||
disc -> ?ROUTE_DISC_TAB;
|
|
||||||
ram -> ?ROUTE_RAM_TAB
|
|
||||||
end.
|
|
|
@ -67,13 +67,11 @@ init([]) ->
|
||||||
KernelSup = child_spec(emqx_kernel_sup, supervisor),
|
KernelSup = child_spec(emqx_kernel_sup, supervisor),
|
||||||
RouterSup = child_spec(emqx_router_sup, supervisor),
|
RouterSup = child_spec(emqx_router_sup, supervisor),
|
||||||
BrokerSup = child_spec(emqx_broker_sup, supervisor),
|
BrokerSup = child_spec(emqx_broker_sup, supervisor),
|
||||||
SessionSup = child_spec(emqx_persistent_session_sup, supervisor),
|
|
||||||
CMSup = child_spec(emqx_cm_sup, supervisor),
|
CMSup = child_spec(emqx_cm_sup, supervisor),
|
||||||
SysSup = child_spec(emqx_sys_sup, supervisor),
|
SysSup = child_spec(emqx_sys_sup, supervisor),
|
||||||
Limiter = child_spec(emqx_limiter_sup, supervisor),
|
Limiter = child_spec(emqx_limiter_sup, supervisor),
|
||||||
Children =
|
Children =
|
||||||
[KernelSup] ++
|
[KernelSup] ++
|
||||||
[SessionSup || emqx_persistent_session:is_store_enabled()] ++
|
|
||||||
[RouterSup || emqx_boot:is_enabled(broker)] ++
|
[RouterSup || emqx_boot:is_enabled(broker)] ++
|
||||||
[BrokerSup || emqx_boot:is_enabled(broker)] ++
|
[BrokerSup || emqx_boot:is_enabled(broker)] ++
|
||||||
[CMSup || emqx_boot:is_enabled(broker)] ++
|
[CMSup || emqx_boot:is_enabled(broker)] ++
|
||||||
|
|
|
@ -1,562 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_persistent_session).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
is_store_enabled/0,
|
|
||||||
init_db_backend/0,
|
|
||||||
storage_backend/0,
|
|
||||||
storage_type/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
discard/2,
|
|
||||||
discard_if_present/1,
|
|
||||||
lookup/1,
|
|
||||||
persist/3,
|
|
||||||
persist_message/1,
|
|
||||||
pending/1,
|
|
||||||
pending/2,
|
|
||||||
resume/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
add_subscription/3,
|
|
||||||
remove_subscription/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
mark_as_delivered/2,
|
|
||||||
mark_resume_begin/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
pending_messages_in_db/2,
|
|
||||||
delete_session_message/1,
|
|
||||||
gc_session_messages/1,
|
|
||||||
session_message_info/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
delete_message/1,
|
|
||||||
first_message_id/0,
|
|
||||||
next_message_id/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export_type([sess_msg_key/0]).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
-include("emqx_channel.hrl").
|
|
||||||
-include("emqx_persistent_session.hrl").
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
|
|
||||||
-compile({inline, [is_store_enabled/0]}).
|
|
||||||
|
|
||||||
%% NOTE: Order is significant because of traversal order of the table.
|
|
||||||
-define(MARKER, 3).
|
|
||||||
-define(DELIVERED, 2).
|
|
||||||
-define(UNDELIVERED, 1).
|
|
||||||
-define(ABANDONED, 0).
|
|
||||||
|
|
||||||
-type bin_timestamp() :: <<_:64>>.
|
|
||||||
-opaque sess_msg_key() ::
|
|
||||||
{emqx_guid:guid(), emqx_guid:guid(), emqx_types:topic(), ?UNDELIVERED | ?DELIVERED}
|
|
||||||
| {emqx_guid:guid(), emqx_guid:guid(), <<>>, ?MARKER}
|
|
||||||
| {emqx_guid:guid(), <<>>, bin_timestamp(), ?ABANDONED}.
|
|
||||||
|
|
||||||
-type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok').
|
|
||||||
|
|
||||||
%% EMQX configuration keys
|
|
||||||
-define(conf_storage_backend, [persistent_session_store, backend, type]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Init
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init_db_backend() ->
|
|
||||||
case is_store_enabled() of
|
|
||||||
true ->
|
|
||||||
StorageType = storage_type(),
|
|
||||||
ok = emqx_trie:create_session_trie(StorageType),
|
|
||||||
ok = emqx_session_router:create_router_tab(StorageType),
|
|
||||||
case storage_backend() of
|
|
||||||
builtin ->
|
|
||||||
emqx_persistent_session_backend_builtin:create_tables(),
|
|
||||||
persistent_term:put(?db_backend_key, emqx_persistent_session_backend_builtin)
|
|
||||||
end,
|
|
||||||
ok;
|
|
||||||
false ->
|
|
||||||
persistent_term:put(?db_backend_key, emqx_persistent_session_backend_dummy),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
is_store_enabled() ->
|
|
||||||
emqx_config:get(?is_enabled_key).
|
|
||||||
|
|
||||||
-spec storage_backend() -> builtin.
|
|
||||||
storage_backend() ->
|
|
||||||
emqx_config:get(?conf_storage_backend).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Session message ADT API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term().
|
|
||||||
session_message_info(timestamp, {_, <<>>, <<TS:64>>, ?ABANDONED}) -> TS;
|
|
||||||
session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID);
|
|
||||||
session_message_info(session_id, {SessionID, _, _, _}) -> SessionID.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% DB API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
first_message_id() ->
|
|
||||||
?db_backend:first_message_id().
|
|
||||||
|
|
||||||
next_message_id(Key) ->
|
|
||||||
?db_backend:next_message_id(Key).
|
|
||||||
|
|
||||||
delete_message(Key) ->
|
|
||||||
?db_backend:delete_message(Key).
|
|
||||||
|
|
||||||
first_session_message() ->
|
|
||||||
?db_backend:first_session_message().
|
|
||||||
|
|
||||||
next_session_message(Key) ->
|
|
||||||
?db_backend:next_session_message(Key).
|
|
||||||
|
|
||||||
delete_session_message(Key) ->
|
|
||||||
?db_backend:delete_session_message(Key).
|
|
||||||
|
|
||||||
put_session_store(#session_store{} = SS) ->
|
|
||||||
?db_backend:put_session_store(SS).
|
|
||||||
|
|
||||||
delete_session_store(ClientID) ->
|
|
||||||
?db_backend:delete_session_store(ClientID).
|
|
||||||
|
|
||||||
lookup_session_store(ClientID) ->
|
|
||||||
?db_backend:lookup_session_store(ClientID).
|
|
||||||
|
|
||||||
put_session_message({_, _, _, _} = Key) ->
|
|
||||||
?db_backend:put_session_message(#session_msg{key = Key}).
|
|
||||||
|
|
||||||
put_message(Msg) ->
|
|
||||||
?db_backend:put_message(Msg).
|
|
||||||
|
|
||||||
get_message(MsgId) ->
|
|
||||||
?db_backend:get_message(MsgId).
|
|
||||||
|
|
||||||
pending_messages_in_db(SessionID, MarkerIds) ->
|
|
||||||
?db_backend:ro_transaction(pending_messages_fun(SessionID, MarkerIds)).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Session API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% The timestamp (TS) is the last time a client interacted with the session,
|
|
||||||
%% or when the client disconnected.
|
|
||||||
-spec persist(
|
|
||||||
emqx_types:clientinfo(),
|
|
||||||
emqx_types:conninfo(),
|
|
||||||
emqx_session:session()
|
|
||||||
) -> emqx_session:session().
|
|
||||||
|
|
||||||
persist(#{clientid := ClientID}, ConnInfo, Session) ->
|
|
||||||
case ClientID == undefined orelse not emqx_session:info(is_persistent, Session) of
|
|
||||||
true ->
|
|
||||||
Session;
|
|
||||||
false ->
|
|
||||||
SS = #session_store{
|
|
||||||
client_id = ClientID,
|
|
||||||
expiry_interval = maps:get(expiry_interval, ConnInfo),
|
|
||||||
ts = timestamp_from_conninfo(ConnInfo),
|
|
||||||
session = Session
|
|
||||||
},
|
|
||||||
case persistent_session_status(SS) of
|
|
||||||
not_persistent ->
|
|
||||||
Session;
|
|
||||||
expired ->
|
|
||||||
discard(ClientID, Session);
|
|
||||||
persistent ->
|
|
||||||
put_session_store(SS),
|
|
||||||
Session
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
timestamp_from_conninfo(ConnInfo) ->
|
|
||||||
case maps:get(disconnected_at, ConnInfo, undefined) of
|
|
||||||
undefined -> erlang:system_time(millisecond);
|
|
||||||
Disconnect -> Disconnect
|
|
||||||
end.
|
|
||||||
|
|
||||||
lookup(ClientID) when is_binary(ClientID) ->
|
|
||||||
case is_store_enabled() of
|
|
||||||
false ->
|
|
||||||
none;
|
|
||||||
true ->
|
|
||||||
case lookup_session_store(ClientID) of
|
|
||||||
none ->
|
|
||||||
none;
|
|
||||||
{value, #session_store{session = S} = SS} ->
|
|
||||||
case persistent_session_status(SS) of
|
|
||||||
expired -> {expired, S};
|
|
||||||
persistent -> {persistent, S}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec discard_if_present(binary()) -> 'ok'.
|
|
||||||
discard_if_present(ClientID) ->
|
|
||||||
case lookup(ClientID) of
|
|
||||||
none ->
|
|
||||||
ok;
|
|
||||||
{Tag, Session} when Tag =:= persistent; Tag =:= expired ->
|
|
||||||
_ = discard(ClientID, Session),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec discard(binary(), emqx_session:session()) -> emqx_session:session().
|
|
||||||
discard(ClientID, Session) ->
|
|
||||||
discard_opt(is_store_enabled(), ClientID, Session).
|
|
||||||
|
|
||||||
discard_opt(false, _ClientID, Session) ->
|
|
||||||
emqx_session:set_field(is_persistent, false, Session);
|
|
||||||
discard_opt(true, ClientID, Session) ->
|
|
||||||
delete_session_store(ClientID),
|
|
||||||
SessionID = emqx_session:info(id, Session),
|
|
||||||
put_session_message({SessionID, <<>>, <<(erlang:system_time(microsecond)):64>>, ?ABANDONED}),
|
|
||||||
Subscriptions = emqx_session:info(subscriptions, Session),
|
|
||||||
emqx_session_router:delete_routes(SessionID, Subscriptions),
|
|
||||||
emqx_session:set_field(is_persistent, false, Session).
|
|
||||||
|
|
||||||
-spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid().
|
|
||||||
mark_resume_begin(SessionID) ->
|
|
||||||
MarkerID = emqx_guid:gen(),
|
|
||||||
put_session_message({SessionID, MarkerID, <<>>, ?MARKER}),
|
|
||||||
MarkerID.
|
|
||||||
|
|
||||||
add_subscription(TopicFilter, SessionID, true = _IsPersistent) ->
|
|
||||||
case is_store_enabled() of
|
|
||||||
true -> emqx_session_router:do_add_route(TopicFilter, SessionID);
|
|
||||||
false -> ok
|
|
||||||
end;
|
|
||||||
add_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
remove_subscription(TopicFilter, SessionID, true = _IsPersistent) ->
|
|
||||||
case is_store_enabled() of
|
|
||||||
true -> emqx_session_router:do_delete_route(TopicFilter, SessionID);
|
|
||||||
false -> ok
|
|
||||||
end;
|
|
||||||
remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Resuming from DB state
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% Must be called inside a emqx_cm_locker transaction.
|
|
||||||
-spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session()) ->
|
|
||||||
{emqx_session:session(), [emqx_types:deliver()]}.
|
|
||||||
resume(ClientInfo, ConnInfo, Session) ->
|
|
||||||
SessionID = emqx_session:info(id, Session),
|
|
||||||
?tp(ps_resuming, #{from => db, sid => SessionID}),
|
|
||||||
|
|
||||||
%% NOTE: Order is important!
|
|
||||||
|
|
||||||
%% 1. Get pending messages from DB.
|
|
||||||
?tp(ps_initial_pendings, #{sid => SessionID}),
|
|
||||||
Pendings1 = pending(SessionID),
|
|
||||||
?tp(ps_got_initial_pendings, #{
|
|
||||||
sid => SessionID,
|
|
||||||
msgs => Pendings1
|
|
||||||
}),
|
|
||||||
|
|
||||||
%% 2. Enqueue messages to mimic that the process was alive
|
|
||||||
%% when the messages were delivered.
|
|
||||||
?tp(ps_persist_pendings, #{sid => SessionID}),
|
|
||||||
Session1 = emqx_session:enqueue(ClientInfo, Pendings1, Session),
|
|
||||||
Session2 = persist(ClientInfo, ConnInfo, Session1),
|
|
||||||
mark_as_delivered(SessionID, Pendings1),
|
|
||||||
?tp(ps_persist_pendings_msgs, #{
|
|
||||||
msgs => Pendings1,
|
|
||||||
sid => SessionID
|
|
||||||
}),
|
|
||||||
|
|
||||||
%% 3. Notify writers that we are resuming.
|
|
||||||
%% They will buffer new messages.
|
|
||||||
?tp(ps_notify_writers, #{sid => SessionID}),
|
|
||||||
Nodes = mria:running_nodes(),
|
|
||||||
NodeMarkers = resume_begin(Nodes, SessionID),
|
|
||||||
?tp(ps_node_markers, #{sid => SessionID, markers => NodeMarkers}),
|
|
||||||
|
|
||||||
%% 4. Subscribe to topics.
|
|
||||||
?tp(ps_resume_session, #{sid => SessionID}),
|
|
||||||
ok = emqx_session:resume(ClientInfo, Session2),
|
|
||||||
|
|
||||||
%% 5. Get pending messages from DB until we find all markers.
|
|
||||||
?tp(ps_marker_pendings, #{sid => SessionID}),
|
|
||||||
MarkerIDs = [Marker || {_, Marker} <- NodeMarkers],
|
|
||||||
Pendings2 = pending(SessionID, MarkerIDs),
|
|
||||||
?tp(ps_marker_pendings_msgs, #{
|
|
||||||
sid => SessionID,
|
|
||||||
msgs => Pendings2
|
|
||||||
}),
|
|
||||||
|
|
||||||
%% 6. Get pending messages from writers.
|
|
||||||
?tp(ps_resume_end, #{sid => SessionID}),
|
|
||||||
WriterPendings = resume_end(Nodes, SessionID),
|
|
||||||
?tp(ps_writer_pendings, #{
|
|
||||||
msgs => WriterPendings,
|
|
||||||
sid => SessionID
|
|
||||||
}),
|
|
||||||
|
|
||||||
%% 7. Drain the inbox and usort the messages
|
|
||||||
%% with the pending messages. (Should be done by caller.)
|
|
||||||
{Session2, Pendings2 ++ WriterPendings}.
|
|
||||||
|
|
||||||
resume_begin(Nodes, SessionID) ->
|
|
||||||
Res = emqx_persistent_session_proto_v1:resume_begin(Nodes, self(), SessionID),
|
|
||||||
[{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)].
|
|
||||||
|
|
||||||
resume_end(Nodes, SessionID) ->
|
|
||||||
Res = emqx_persistent_session_proto_v1:resume_end(Nodes, self(), SessionID),
|
|
||||||
?tp(ps_erpc_multical_result, #{res => Res, sid => SessionID}),
|
|
||||||
%% TODO: Should handle the errors
|
|
||||||
[
|
|
||||||
{deliver, STopic, M}
|
|
||||||
|| {ok, {ok, Messages}} <- Res,
|
|
||||||
{{M, STopic}} <- Messages
|
|
||||||
].
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Messages API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
persist_message(Msg) ->
|
|
||||||
case is_store_enabled() of
|
|
||||||
true -> do_persist_message(Msg);
|
|
||||||
false -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_persist_message(Msg) ->
|
|
||||||
case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of
|
|
||||||
true ->
|
|
||||||
ok;
|
|
||||||
false ->
|
|
||||||
case emqx_session_router:match_routes(emqx_message:topic(Msg)) of
|
|
||||||
[] ->
|
|
||||||
ok;
|
|
||||||
Routes ->
|
|
||||||
put_message(Msg),
|
|
||||||
MsgId = emqx_message:id(Msg),
|
|
||||||
persist_message_routes(Routes, MsgId, Msg)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
persist_message_routes([#route{dest = SessionID, topic = STopic} | Left], MsgId, Msg) ->
|
|
||||||
?tp(ps_persist_msg, #{sid => SessionID, payload => emqx_message:payload(Msg)}),
|
|
||||||
put_session_message({SessionID, MsgId, STopic, ?UNDELIVERED}),
|
|
||||||
emqx_session_router:buffer(SessionID, STopic, Msg),
|
|
||||||
persist_message_routes(Left, MsgId, Msg);
|
|
||||||
persist_message_routes([], _MsgId, _Msg) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
mark_as_delivered(SessionID, List) ->
|
|
||||||
case is_store_enabled() of
|
|
||||||
true -> do_mark_as_delivered(SessionID, List);
|
|
||||||
false -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) ->
|
|
||||||
MsgID = emqx_message:id(Msg),
|
|
||||||
case next_session_message({SessionID, MsgID, STopic, ?ABANDONED}) of
|
|
||||||
{SessionID, MsgID, STopic, ?UNDELIVERED} = Key ->
|
|
||||||
%% We can safely delete this entry
|
|
||||||
%% instead of marking it as delivered.
|
|
||||||
delete_session_message(Key);
|
|
||||||
_ ->
|
|
||||||
put_session_message({SessionID, MsgID, STopic, ?DELIVERED})
|
|
||||||
end,
|
|
||||||
do_mark_as_delivered(SessionID, Left);
|
|
||||||
do_mark_as_delivered(_SessionID, []) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
-spec pending(emqx_session:session_id()) ->
|
|
||||||
[{emqx_types:message(), STopic :: binary()}].
|
|
||||||
pending(SessionID) ->
|
|
||||||
pending_messages_in_db(SessionID, []).
|
|
||||||
|
|
||||||
-spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) ->
|
|
||||||
[{emqx_types:message(), STopic :: binary()}].
|
|
||||||
pending(SessionID, MarkerIds) ->
|
|
||||||
%% TODO: Handle lost MarkerIDs
|
|
||||||
case emqx_session_router:pending(SessionID, MarkerIds) of
|
|
||||||
incomplete ->
|
|
||||||
timer:sleep(10),
|
|
||||||
pending(SessionID, MarkerIds);
|
|
||||||
Delivers ->
|
|
||||||
Delivers
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Session internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @private [MQTT-3.1.2-23]
|
|
||||||
persistent_session_status(#session_store{expiry_interval = 0}) ->
|
|
||||||
not_persistent;
|
|
||||||
persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) ->
|
|
||||||
persistent;
|
|
||||||
persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
|
|
||||||
case E + TS > erlang:system_time(millisecond) of
|
|
||||||
true -> persistent;
|
|
||||||
false -> expired
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Pending messages internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
pending_messages_fun(SessionID, MarkerIds) ->
|
|
||||||
fun() ->
|
|
||||||
case pending_messages({SessionID, <<>>, <<>>, ?DELIVERED}, [], MarkerIds) of
|
|
||||||
{Pending, []} -> read_pending_msgs(Pending, []);
|
|
||||||
{_Pending, [_ | _]} -> incomplete
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
read_pending_msgs([{MsgId, STopic} | Left], Acc) ->
|
|
||||||
Acc1 =
|
|
||||||
try
|
|
||||||
[{deliver, STopic, get_message(MsgId)} | Acc]
|
|
||||||
catch
|
|
||||||
error:{msg_not_found, _} ->
|
|
||||||
HighwaterMark =
|
|
||||||
erlang:system_time(microsecond) -
|
|
||||||
emqx_config:get(?msg_retain) * 1000,
|
|
||||||
case emqx_guid:timestamp(MsgId) < HighwaterMark of
|
|
||||||
%% Probably cleaned by GC
|
|
||||||
true -> Acc;
|
|
||||||
false -> error({msg_not_found, MsgId})
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
read_pending_msgs(Left, Acc1);
|
|
||||||
read_pending_msgs([], Acc) ->
|
|
||||||
lists:reverse(Acc).
|
|
||||||
|
|
||||||
%% The keys are ordered by
|
|
||||||
%% {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
|
|
||||||
%% {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
|
|
||||||
%% where
|
|
||||||
%% <<>> < emqx_guid:guid()
|
|
||||||
%% <<>> < bin_timestamp()
|
|
||||||
%% emqx_guid:guid() is ordered in ts() and by node()
|
|
||||||
%% ?ABANDONED < ?UNDELIVERED < ?DELIVERED < ?MARKER
|
|
||||||
%%
|
|
||||||
%% We traverse the table until we reach another session.
|
|
||||||
%% TODO: Garbage collect the delivered messages.
|
|
||||||
pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, MarkerIds) ->
|
|
||||||
case next_session_message(PrevKey) of
|
|
||||||
{S, <<>>, _TS, ?ABANDONED} when S =:= SessionID ->
|
|
||||||
{[], []};
|
|
||||||
{S, MsgId, <<>>, ?MARKER} = Key when S =:= SessionID ->
|
|
||||||
MarkerIds1 = MarkerIds -- [MsgId],
|
|
||||||
case PrevTag =:= ?UNDELIVERED of
|
|
||||||
false -> pending_messages(Key, Acc, MarkerIds1);
|
|
||||||
true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds1)
|
|
||||||
end;
|
|
||||||
{S, MsgId, STopic, ?DELIVERED} = Key when
|
|
||||||
S =:= SessionID,
|
|
||||||
MsgId =:= PrevMsgId,
|
|
||||||
STopic =:= PrevSTopic
|
|
||||||
->
|
|
||||||
pending_messages(Key, Acc, MarkerIds);
|
|
||||||
{S, _MsgId, _STopic, _Tag} = Key when S =:= SessionID ->
|
|
||||||
case PrevTag =:= ?UNDELIVERED of
|
|
||||||
false -> pending_messages(Key, Acc, MarkerIds);
|
|
||||||
true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds)
|
|
||||||
end;
|
|
||||||
%% Next session_id or '$end_of_table'
|
|
||||||
_What ->
|
|
||||||
case PrevTag =:= ?UNDELIVERED of
|
|
||||||
false -> {lists:reverse(Acc), MarkerIds};
|
|
||||||
true -> {lists:reverse([{PrevMsgId, PrevSTopic} | Acc]), MarkerIds}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Garbage collection
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec gc_session_messages(gc_traverse_fun()) -> 'ok'.
|
|
||||||
gc_session_messages(Fun) ->
|
|
||||||
gc_traverse(first_session_message(), <<>>, false, Fun).
|
|
||||||
|
|
||||||
gc_traverse('$end_of_table', _SessionID, _Abandoned, _Fun) ->
|
|
||||||
ok;
|
|
||||||
gc_traverse({S, <<>>, _TS, ?ABANDONED} = Key, _SessionID, _Abandoned, Fun) ->
|
|
||||||
%% Only report the abandoned session if it has no messages.
|
|
||||||
%% We want to keep the abandoned marker to last to make the GC reentrant.
|
|
||||||
case next_session_message(Key) of
|
|
||||||
'$end_of_table' = NextKey ->
|
|
||||||
ok = Fun(abandoned, Key),
|
|
||||||
gc_traverse(NextKey, S, true, Fun);
|
|
||||||
{S2, _, _, _} = NextKey when S =:= S2 ->
|
|
||||||
gc_traverse(NextKey, S, true, Fun);
|
|
||||||
{_, _, _, _} = NextKey ->
|
|
||||||
ok = Fun(abandoned, Key),
|
|
||||||
gc_traverse(NextKey, S, true, Fun)
|
|
||||||
end;
|
|
||||||
gc_traverse({S, _MsgID, <<>>, ?MARKER} = Key, SessionID, Abandoned, Fun) ->
|
|
||||||
ok = Fun(marker, Key),
|
|
||||||
NewAbandoned = S =:= SessionID andalso Abandoned,
|
|
||||||
gc_traverse(next_session_message(Key), S, NewAbandoned, Fun);
|
|
||||||
gc_traverse({S, _MsgID, _STopic, _Tag} = Key, SessionID, Abandoned, Fun) when
|
|
||||||
Abandoned andalso
|
|
||||||
S =:= SessionID
|
|
||||||
->
|
|
||||||
%% Delete all messages from an abandoned session.
|
|
||||||
ok = Fun(delete, Key),
|
|
||||||
gc_traverse(next_session_message(Key), S, Abandoned, Fun);
|
|
||||||
gc_traverse({S, MsgID, STopic, ?UNDELIVERED} = Key, SessionID, Abandoned, Fun) ->
|
|
||||||
case next_session_message(Key) of
|
|
||||||
{S1, M, ST, ?DELIVERED} = NextKey when
|
|
||||||
S1 =:= S andalso
|
|
||||||
MsgID =:= M andalso
|
|
||||||
STopic =:= ST
|
|
||||||
->
|
|
||||||
%% We have both markers for the same message/topic so it is safe to delete both.
|
|
||||||
ok = Fun(delete, Key),
|
|
||||||
ok = Fun(delete, NextKey),
|
|
||||||
gc_traverse(next_session_message(NextKey), S, Abandoned, Fun);
|
|
||||||
NextKey ->
|
|
||||||
%% Something else is here, so let's just loop.
|
|
||||||
NewAbandoned = S =:= SessionID andalso Abandoned,
|
|
||||||
gc_traverse(NextKey, SessionID, NewAbandoned, Fun)
|
|
||||||
end;
|
|
||||||
gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) ->
|
|
||||||
%% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing.
|
|
||||||
NewAbandoned = S =:= SessionID andalso Abandoned,
|
|
||||||
gc_traverse(next_session_message(Key), S, NewAbandoned, Fun).
|
|
||||||
|
|
||||||
-spec storage_type() -> ram | disc.
|
|
||||||
storage_type() ->
|
|
||||||
case emqx_config:get(?on_disc_key) of
|
|
||||||
true -> disc;
|
|
||||||
false -> ram
|
|
||||||
end.
|
|
|
@ -1,41 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
|
|
||||||
|
|
||||||
-record(session_store, {
|
|
||||||
client_id :: binary(),
|
|
||||||
expiry_interval :: non_neg_integer(),
|
|
||||||
ts :: non_neg_integer(),
|
|
||||||
session :: emqx_session:session()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(session_msg, {
|
|
||||||
key :: emqx_persistent_session:sess_msg_key(),
|
|
||||||
val = [] :: []
|
|
||||||
}).
|
|
||||||
|
|
||||||
-define(cfg_root, persistent_session_store).
|
|
||||||
-define(db_backend_key, [?cfg_root, db_backend]).
|
|
||||||
-define(is_enabled_key, [?cfg_root, enabled]).
|
|
||||||
-define(msg_retain, [?cfg_root, max_retain_undelivered]).
|
|
||||||
-define(on_disc_key, [?cfg_root, on_disc]).
|
|
||||||
|
|
||||||
-define(SESSION_STORE, emqx_session_store).
|
|
||||||
-define(SESS_MSG_TAB, emqx_session_msg).
|
|
||||||
-define(MSG_TAB, emqx_persistent_msg).
|
|
||||||
|
|
||||||
-define(db_backend, (persistent_term:get(?db_backend_key))).
|
|
|
@ -1,157 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_persistent_session_backend_builtin).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
|
||||||
-include("emqx_persistent_session.hrl").
|
|
||||||
|
|
||||||
-export([
|
|
||||||
create_tables/0,
|
|
||||||
first_message_id/0,
|
|
||||||
next_message_id/1,
|
|
||||||
delete_message/1,
|
|
||||||
first_session_message/0,
|
|
||||||
next_session_message/1,
|
|
||||||
delete_session_message/1,
|
|
||||||
put_session_store/1,
|
|
||||||
delete_session_store/1,
|
|
||||||
lookup_session_store/1,
|
|
||||||
put_session_message/1,
|
|
||||||
put_message/1,
|
|
||||||
get_message/1,
|
|
||||||
ro_transaction/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-type mria_table_type() :: ram_copies | disc_copies | rocksdb_copies.
|
|
||||||
|
|
||||||
-define(IS_ETS(BACKEND), (BACKEND =:= ram_copies orelse BACKEND =:= disc_copies)).
|
|
||||||
|
|
||||||
create_tables() ->
|
|
||||||
SessStoreBackend = table_type(session),
|
|
||||||
ok = mria:create_table(?SESSION_STORE, [
|
|
||||||
{type, set},
|
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
|
||||||
{storage, SessStoreBackend},
|
|
||||||
{record_name, session_store},
|
|
||||||
{attributes, record_info(fields, session_store)},
|
|
||||||
{storage_properties, storage_properties(?SESSION_STORE, SessStoreBackend)}
|
|
||||||
]),
|
|
||||||
|
|
||||||
SessMsgBackend = table_type(session_messages),
|
|
||||||
ok = mria:create_table(?SESS_MSG_TAB, [
|
|
||||||
{type, ordered_set},
|
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
|
||||||
{storage, SessMsgBackend},
|
|
||||||
{record_name, session_msg},
|
|
||||||
{attributes, record_info(fields, session_msg)},
|
|
||||||
{storage_properties, storage_properties(?SESS_MSG_TAB, SessMsgBackend)}
|
|
||||||
]),
|
|
||||||
|
|
||||||
MsgBackend = table_type(messages),
|
|
||||||
ok = mria:create_table(?MSG_TAB, [
|
|
||||||
{type, ordered_set},
|
|
||||||
{rlog_shard, ?PERSISTENT_SESSION_SHARD},
|
|
||||||
{storage, MsgBackend},
|
|
||||||
{record_name, message},
|
|
||||||
{attributes, record_info(fields, message)},
|
|
||||||
{storage_properties, storage_properties(?MSG_TAB, MsgBackend)}
|
|
||||||
]).
|
|
||||||
|
|
||||||
first_session_message() ->
|
|
||||||
mnesia:dirty_first(?SESS_MSG_TAB).
|
|
||||||
|
|
||||||
next_session_message(Key) ->
|
|
||||||
mnesia:dirty_next(?SESS_MSG_TAB, Key).
|
|
||||||
|
|
||||||
first_message_id() ->
|
|
||||||
mnesia:dirty_first(?MSG_TAB).
|
|
||||||
|
|
||||||
next_message_id(Key) ->
|
|
||||||
mnesia:dirty_next(?MSG_TAB, Key).
|
|
||||||
|
|
||||||
delete_message(Key) ->
|
|
||||||
mria:dirty_delete(?MSG_TAB, Key).
|
|
||||||
|
|
||||||
delete_session_message(Key) ->
|
|
||||||
mria:dirty_delete(?SESS_MSG_TAB, Key).
|
|
||||||
|
|
||||||
put_session_store(SS) ->
|
|
||||||
mria:dirty_write(?SESSION_STORE, SS).
|
|
||||||
|
|
||||||
delete_session_store(ClientID) ->
|
|
||||||
mria:dirty_delete(?SESSION_STORE, ClientID).
|
|
||||||
|
|
||||||
lookup_session_store(ClientID) ->
|
|
||||||
case mnesia:dirty_read(?SESSION_STORE, ClientID) of
|
|
||||||
[] -> none;
|
|
||||||
[SS] -> {value, SS}
|
|
||||||
end.
|
|
||||||
|
|
||||||
put_session_message(SessMsg) ->
|
|
||||||
mria:dirty_write(?SESS_MSG_TAB, SessMsg).
|
|
||||||
|
|
||||||
put_message(Msg) ->
|
|
||||||
mria:dirty_write(?MSG_TAB, Msg).
|
|
||||||
|
|
||||||
get_message(MsgId) ->
|
|
||||||
case mnesia:read(?MSG_TAB, MsgId) of
|
|
||||||
[] -> error({msg_not_found, MsgId});
|
|
||||||
[Msg] -> Msg
|
|
||||||
end.
|
|
||||||
|
|
||||||
ro_transaction(Fun) ->
|
|
||||||
{atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
|
|
||||||
Res.
|
|
||||||
|
|
||||||
-spec storage_properties(?SESSION_STORE | ?SESS_MSG_TAB | ?MSG_TAB, mria_table_type()) -> term().
|
|
||||||
storage_properties(?SESSION_STORE, Backend) when ?IS_ETS(Backend) ->
|
|
||||||
[{ets, [{read_concurrency, true}]}];
|
|
||||||
storage_properties(_, Backend) when ?IS_ETS(Backend) ->
|
|
||||||
[
|
|
||||||
{ets, [
|
|
||||||
{read_concurrency, true},
|
|
||||||
{write_concurrency, true}
|
|
||||||
]}
|
|
||||||
];
|
|
||||||
storage_properties(_, _) ->
|
|
||||||
[].
|
|
||||||
|
|
||||||
%% Dialyzer sees the compiled literal in
|
|
||||||
%% `mria:rocksdb_backend_available/0' and complains about the
|
|
||||||
%% complementar match arm...
|
|
||||||
-dialyzer({no_match, table_type/1}).
|
|
||||||
-spec table_type(atom()) -> mria_table_type().
|
|
||||||
table_type(Table) ->
|
|
||||||
DiscPersistence = emqx_config:get([?cfg_root, on_disc]),
|
|
||||||
RamCache = get_overlayed(Table, ram_cache),
|
|
||||||
RocksDBAvailable = mria:rocksdb_backend_available(),
|
|
||||||
case {DiscPersistence, RamCache, RocksDBAvailable} of
|
|
||||||
{true, true, _} ->
|
|
||||||
disc_copies;
|
|
||||||
{true, false, true} ->
|
|
||||||
rocksdb_copies;
|
|
||||||
{true, false, false} ->
|
|
||||||
disc_copies;
|
|
||||||
{false, _, _} ->
|
|
||||||
ram_copies
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec get_overlayed(atom(), on_disc | ram_cache) -> boolean().
|
|
||||||
get_overlayed(Table, Suffix) ->
|
|
||||||
Default = emqx_config:get([?cfg_root, Suffix]),
|
|
||||||
emqx_config:get([?cfg_root, backend, Table, Suffix], Default).
|
|
|
@ -1,76 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_persistent_session_backend_dummy).
|
|
||||||
|
|
||||||
-include("emqx_persistent_session.hrl").
|
|
||||||
|
|
||||||
-export([
|
|
||||||
first_message_id/0,
|
|
||||||
next_message_id/1,
|
|
||||||
delete_message/1,
|
|
||||||
first_session_message/0,
|
|
||||||
next_session_message/1,
|
|
||||||
delete_session_message/1,
|
|
||||||
put_session_store/1,
|
|
||||||
delete_session_store/1,
|
|
||||||
lookup_session_store/1,
|
|
||||||
put_session_message/1,
|
|
||||||
put_message/1,
|
|
||||||
get_message/1,
|
|
||||||
ro_transaction/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
first_message_id() ->
|
|
||||||
'$end_of_table'.
|
|
||||||
|
|
||||||
next_message_id(_) ->
|
|
||||||
'$end_of_table'.
|
|
||||||
|
|
||||||
-spec delete_message(binary()) -> no_return().
|
|
||||||
delete_message(_Key) ->
|
|
||||||
error(should_not_be_called).
|
|
||||||
|
|
||||||
first_session_message() ->
|
|
||||||
'$end_of_table'.
|
|
||||||
|
|
||||||
next_session_message(_Key) ->
|
|
||||||
'$end_of_table'.
|
|
||||||
|
|
||||||
delete_session_message(_Key) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
put_session_store(#session_store{}) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
delete_session_store(_ClientID) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
lookup_session_store(_ClientID) ->
|
|
||||||
none.
|
|
||||||
|
|
||||||
put_session_message({_, _, _, _}) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
put_message(_Msg) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
-spec get_message(binary()) -> no_return().
|
|
||||||
get_message(_MsgId) ->
|
|
||||||
error(should_not_be_called).
|
|
||||||
|
|
||||||
ro_transaction(Fun) ->
|
|
||||||
Fun().
|
|
|
@ -1,163 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_persistent_session_gc).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include("emqx_persistent_session.hrl").
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([
|
|
||||||
init/1,
|
|
||||||
handle_call/3,
|
|
||||||
handle_cast/2,
|
|
||||||
handle_info/2,
|
|
||||||
terminate/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
|
||||||
-export([
|
|
||||||
session_gc_worker/2,
|
|
||||||
message_gc_worker/0
|
|
||||||
]).
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
%% TODO: Maybe these should be configurable?
|
|
||||||
-define(MARKER_GRACE_PERIOD, 60000000).
|
|
||||||
-define(ABANDONED_GRACE_PERIOD, 300000000).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% gen_server callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
process_flag(trap_exit, true),
|
|
||||||
mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD),
|
|
||||||
{ok, start_message_gc_timer(start_session_gc_timer(#{}))}.
|
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
|
||||||
Reply = ok,
|
|
||||||
{reply, Reply, State}.
|
|
||||||
|
|
||||||
handle_cast(_Request, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info({timeout, Ref, session_gc_timeout}, State) ->
|
|
||||||
State1 = session_gc_timeout(Ref, State),
|
|
||||||
{noreply, State1};
|
|
||||||
handle_info({timeout, Ref, message_gc_timeout}, State) ->
|
|
||||||
State1 = message_gc_timeout(Ref, State),
|
|
||||||
{noreply, State1};
|
|
||||||
handle_info(_Info, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Session messages GC
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_session_gc_timer(State) ->
|
|
||||||
Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]),
|
|
||||||
State#{session_gc_timer => erlang:start_timer(Interval, self(), session_gc_timeout)}.
|
|
||||||
|
|
||||||
session_gc_timeout(Ref, #{session_gc_timer := R} = State) when R =:= Ref ->
|
|
||||||
%% Prevent overlapping processes.
|
|
||||||
GCPid = maps:get(session_gc_pid, State, undefined),
|
|
||||||
case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of
|
|
||||||
true ->
|
|
||||||
start_session_gc_timer(State);
|
|
||||||
false ->
|
|
||||||
start_session_gc_timer(State#{
|
|
||||||
session_gc_pid => proc_lib:spawn_link(fun session_gc_worker/0)
|
|
||||||
})
|
|
||||||
end;
|
|
||||||
session_gc_timeout(_Ref, State) ->
|
|
||||||
State.
|
|
||||||
|
|
||||||
session_gc_worker() ->
|
|
||||||
ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2).
|
|
||||||
|
|
||||||
session_gc_worker(delete, Key) ->
|
|
||||||
emqx_persistent_session:delete_session_message(Key);
|
|
||||||
session_gc_worker(marker, Key) ->
|
|
||||||
TS = emqx_persistent_session:session_message_info(timestamp, Key),
|
|
||||||
case TS + ?MARKER_GRACE_PERIOD < erlang:system_time(microsecond) of
|
|
||||||
true -> emqx_persistent_session:delete_session_message(Key);
|
|
||||||
false -> ok
|
|
||||||
end;
|
|
||||||
session_gc_worker(abandoned, Key) ->
|
|
||||||
TS = emqx_persistent_session:session_message_info(timestamp, Key),
|
|
||||||
case TS + ?ABANDONED_GRACE_PERIOD < erlang:system_time(microsecond) of
|
|
||||||
true -> emqx_persistent_session:delete_session_message(Key);
|
|
||||||
false -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Message GC
|
|
||||||
%% --------------------------------------------------------------------
|
|
||||||
%% The message GC simply removes all messages older than the retain
|
|
||||||
%% period. A more exact GC would either involve treating the session
|
|
||||||
%% message table as root set, or some kind of reference counting.
|
|
||||||
%% We sacrifice space for simplicity at this point.
|
|
||||||
start_message_gc_timer(State) ->
|
|
||||||
Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]),
|
|
||||||
State#{message_gc_timer => erlang:start_timer(Interval, self(), message_gc_timeout)}.
|
|
||||||
|
|
||||||
message_gc_timeout(Ref, #{message_gc_timer := R} = State) when R =:= Ref ->
|
|
||||||
%% Prevent overlapping processes.
|
|
||||||
GCPid = maps:get(message_gc_pid, State, undefined),
|
|
||||||
case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of
|
|
||||||
true ->
|
|
||||||
start_message_gc_timer(State);
|
|
||||||
false ->
|
|
||||||
start_message_gc_timer(State#{
|
|
||||||
message_gc_pid => proc_lib:spawn_link(fun message_gc_worker/0)
|
|
||||||
})
|
|
||||||
end;
|
|
||||||
message_gc_timeout(_Ref, State) ->
|
|
||||||
State.
|
|
||||||
|
|
||||||
message_gc_worker() ->
|
|
||||||
HighWaterMark = erlang:system_time(microsecond) - emqx_config:get(?msg_retain) * 1000,
|
|
||||||
message_gc_worker(emqx_persistent_session:first_message_id(), HighWaterMark).
|
|
||||||
|
|
||||||
message_gc_worker('$end_of_table', _HighWaterMark) ->
|
|
||||||
ok;
|
|
||||||
message_gc_worker(MsgId, HighWaterMark) ->
|
|
||||||
case emqx_guid:timestamp(MsgId) < HighWaterMark of
|
|
||||||
true ->
|
|
||||||
emqx_persistent_session:delete_message(MsgId),
|
|
||||||
message_gc_worker(emqx_persistent_session:next_message_id(MsgId), HighWaterMark);
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end.
|
|
|
@ -1,69 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_persistent_session_sup).
|
|
||||||
|
|
||||||
-behaviour(supervisor).
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
%% We want this supervisor to own the table for restarts
|
|
||||||
SessionTab = emqx_session_router:create_init_tab(),
|
|
||||||
|
|
||||||
%% Resume worker sup
|
|
||||||
ResumeSup = #{
|
|
||||||
id => router_worker_sup,
|
|
||||||
start => {emqx_session_router_worker_sup, start_link, [SessionTab]},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 2000,
|
|
||||||
type => supervisor,
|
|
||||||
modules => [emqx_session_router_worker_sup]
|
|
||||||
},
|
|
||||||
|
|
||||||
SessionRouterPool = emqx_pool_sup:spec(
|
|
||||||
session_router_pool,
|
|
||||||
[
|
|
||||||
session_router_pool,
|
|
||||||
hash,
|
|
||||||
{emqx_session_router, start_link, []}
|
|
||||||
]
|
|
||||||
),
|
|
||||||
|
|
||||||
GCWorker = child_spec(emqx_persistent_session_gc, worker),
|
|
||||||
|
|
||||||
Spec = #{
|
|
||||||
strategy => one_for_all,
|
|
||||||
intensity => 0,
|
|
||||||
period => 1
|
|
||||||
},
|
|
||||||
|
|
||||||
{ok, {Spec, [ResumeSup, SessionRouterPool, GCWorker]}}.
|
|
||||||
|
|
||||||
child_spec(Mod, worker) ->
|
|
||||||
#{
|
|
||||||
id => Mod,
|
|
||||||
start => {Mod, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 15000,
|
|
||||||
type => worker,
|
|
||||||
modules => [Mod]
|
|
||||||
}.
|
|
|
@ -1,41 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_persistent_session_proto_v1).
|
|
||||||
|
|
||||||
-behaviour(emqx_bpapi).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
introduced_in/0,
|
|
||||||
resume_begin/3,
|
|
||||||
resume_end/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-include("bpapi.hrl").
|
|
||||||
-include("emqx.hrl").
|
|
||||||
|
|
||||||
introduced_in() ->
|
|
||||||
"5.0.0".
|
|
||||||
|
|
||||||
-spec resume_begin([node()], pid(), binary()) ->
|
|
||||||
emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]).
|
|
||||||
resume_begin(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) ->
|
|
||||||
erpc:multicall(Nodes, emqx_session_router, resume_begin, [Pid, SessionID]).
|
|
||||||
|
|
||||||
-spec resume_end([node()], pid(), binary()) ->
|
|
||||||
emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}).
|
|
||||||
resume_end(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) ->
|
|
||||||
erpc:multicall(Nodes, emqx_session_router, resume_end, [Pid, SessionID]).
|
|
|
@ -53,11 +53,13 @@
|
||||||
-define(IGNORED_MODULES, "emqx_rpc").
|
-define(IGNORED_MODULES, "emqx_rpc").
|
||||||
-define(FORCE_DELETED_MODULES, [
|
-define(FORCE_DELETED_MODULES, [
|
||||||
emqx_statsd,
|
emqx_statsd,
|
||||||
emqx_statsd_proto_v1
|
emqx_statsd_proto_v1,
|
||||||
|
emqx_persistent_session_proto_v1
|
||||||
]).
|
]).
|
||||||
-define(FORCE_DELETED_APIS, [
|
-define(FORCE_DELETED_APIS, [
|
||||||
{emqx_statsd, 1},
|
{emqx_statsd, 1},
|
||||||
{emqx_plugin_libs, 1}
|
{emqx_plugin_libs, 1},
|
||||||
|
{emqx_persistent_session, 1}
|
||||||
]).
|
]).
|
||||||
%% List of known RPC backend modules:
|
%% List of known RPC backend modules:
|
||||||
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
-include_lib("../include/emqx.hrl").
|
-include_lib("../include/emqx.hrl").
|
||||||
-include("../src/persistent_session/emqx_persistent_session.hrl").
|
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -51,76 +50,23 @@ all() ->
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
|
|
||||||
GCTests = [TC || TC <- TCs, is_gc_tc(TC)],
|
|
||||||
OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
|
|
||||||
[
|
[
|
||||||
{persistent_store_enabled, [
|
|
||||||
{group, ram_tables},
|
|
||||||
{group, disc_tables}
|
|
||||||
]},
|
|
||||||
{persistent_store_disabled, [{group, no_kill_connection_process}]},
|
{persistent_store_disabled, [{group, no_kill_connection_process}]},
|
||||||
{ram_tables, [], [
|
|
||||||
{group, no_kill_connection_process},
|
|
||||||
{group, kill_connection_process},
|
|
||||||
{group, snabbkaffe},
|
|
||||||
{group, gc_tests}
|
|
||||||
]},
|
|
||||||
{disc_tables, [], [
|
|
||||||
{group, no_kill_connection_process},
|
|
||||||
{group, kill_connection_process},
|
|
||||||
{group, snabbkaffe},
|
|
||||||
{group, gc_tests}
|
|
||||||
]},
|
|
||||||
{no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
|
{no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
|
||||||
{kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
|
{tcp, [], TCs},
|
||||||
{snabbkaffe, [], [
|
{quic, [], TCs},
|
||||||
{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}
|
{ws, [], TCs}
|
||||||
]},
|
|
||||||
{tcp, [], OtherTCs},
|
|
||||||
{quic, [], OtherTCs},
|
|
||||||
{ws, [], OtherTCs},
|
|
||||||
{tcp_snabbkaffe, [], SnabbkaffeTCs},
|
|
||||||
{quic_snabbkaffe, [], SnabbkaffeTCs},
|
|
||||||
{ws_snabbkaffe, [], SnabbkaffeTCs},
|
|
||||||
{gc_tests, [], GCTests}
|
|
||||||
].
|
].
|
||||||
|
|
||||||
is_snabbkaffe_tc(TC) ->
|
|
||||||
re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch.
|
|
||||||
|
|
||||||
is_gc_tc(TC) ->
|
|
||||||
re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
|
|
||||||
|
|
||||||
init_per_group(persistent_store_enabled, Config) ->
|
|
||||||
[{persistent_store_enabled, true} | Config];
|
|
||||||
init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables ->
|
|
||||||
%% Start Apps
|
|
||||||
Reply =
|
|
||||||
case Group =:= ram_tables of
|
|
||||||
true -> ram;
|
|
||||||
false -> disc
|
|
||||||
end,
|
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
|
||||||
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
|
|
||||||
meck:expect(emqx_config, get, fun
|
|
||||||
(?on_disc_key) -> Reply =:= disc;
|
|
||||||
(?is_enabled_key) -> true;
|
|
||||||
(Other) -> meck:passthrough([Other])
|
|
||||||
end),
|
|
||||||
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
|
|
||||||
?assertEqual(true, emqx_persistent_session:is_store_enabled()),
|
|
||||||
Config;
|
|
||||||
init_per_group(persistent_store_disabled, Config) ->
|
init_per_group(persistent_store_disabled, Config) ->
|
||||||
%% Start Apps
|
%% Start Apps
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_config, get, fun
|
meck:expect(emqx_config, get, fun
|
||||||
(?is_enabled_key) -> false;
|
([persistent_session_store, enabled]) -> false;
|
||||||
(Other) -> meck:passthrough([Other])
|
(Other) -> meck:passthrough([Other])
|
||||||
end),
|
end),
|
||||||
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
|
emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
|
||||||
?assertEqual(false, emqx_persistent_session:is_store_enabled()),
|
|
||||||
[{persistent_store_enabled, false} | Config];
|
[{persistent_store_enabled, false} | Config];
|
||||||
init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe ->
|
init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe ->
|
||||||
[
|
[
|
||||||
|
@ -140,43 +86,7 @@ init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe ->
|
||||||
init_per_group(no_kill_connection_process, Config) ->
|
init_per_group(no_kill_connection_process, Config) ->
|
||||||
[{kill_connection_process, false} | Config];
|
[{kill_connection_process, false} | Config];
|
||||||
init_per_group(kill_connection_process, Config) ->
|
init_per_group(kill_connection_process, Config) ->
|
||||||
[{kill_connection_process, true} | Config];
|
[{kill_connection_process, true} | Config].
|
||||||
init_per_group(snabbkaffe, Config) ->
|
|
||||||
[{kill_connection_process, true} | Config];
|
|
||||||
init_per_group(gc_tests, Config) ->
|
|
||||||
%% We need to make sure the system does not interfere with this test group.
|
|
||||||
lists:foreach(
|
|
||||||
fun(ClientId) ->
|
|
||||||
maybe_kill_connection_process(ClientId, [{kill_connection_process, true}])
|
|
||||||
end,
|
|
||||||
emqx_cm:all_client_ids()
|
|
||||||
),
|
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
|
||||||
SessionMsgEts = gc_tests_session_store,
|
|
||||||
MsgEts = gc_tests_msg_store,
|
|
||||||
Pid = spawn(fun() ->
|
|
||||||
ets:new(SessionMsgEts, [named_table, public, ordered_set]),
|
|
||||||
ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]),
|
|
||||||
receive
|
|
||||||
stop -> ok
|
|
||||||
end
|
|
||||||
end),
|
|
||||||
meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
|
|
||||||
meck:expect(mnesia, dirty_first, fun
|
|
||||||
(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
|
|
||||||
(?MSG_TAB) -> ets:first(MsgEts);
|
|
||||||
(X) -> meck:passthrough([X])
|
|
||||||
end),
|
|
||||||
meck:expect(mnesia, dirty_next, fun
|
|
||||||
(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
|
|
||||||
(?MSG_TAB, X) -> ets:next(MsgEts, X);
|
|
||||||
(Tab, X) -> meck:passthrough([Tab, X])
|
|
||||||
end),
|
|
||||||
meck:expect(mnesia, dirty_delete, fun
|
|
||||||
(?MSG_TAB, X) -> ets:delete(MsgEts, X);
|
|
||||||
(Tab, X) -> meck:passthrough([Tab, X])
|
|
||||||
end),
|
|
||||||
[{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
@ -188,15 +98,6 @@ end_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:ensure_mnesia_stopped(),
|
emqx_common_test_helpers:ensure_mnesia_stopped(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
end_per_group(gc_tests, Config) ->
|
|
||||||
meck:unload(mnesia),
|
|
||||||
?config(store_owner, Config) ! stop,
|
|
||||||
ok;
|
|
||||||
end_per_group(Group, _Config) when
|
|
||||||
Group =:= ram_tables; Group =:= disc_tables
|
|
||||||
->
|
|
||||||
meck:unload(emqx_config),
|
|
||||||
emqx_common_test_helpers:stop_apps([]);
|
|
||||||
end_per_group(persistent_store_disabled, _Config) ->
|
end_per_group(persistent_store_disabled, _Config) ->
|
||||||
meck:unload(emqx_config),
|
meck:unload(emqx_config),
|
||||||
emqx_common_test_helpers:stop_apps([]);
|
emqx_common_test_helpers:stop_apps([]);
|
||||||
|
@ -205,23 +106,12 @@ end_per_group(_Group, _Config) ->
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
init_per_testcase(TestCase, Config) ->
|
||||||
Config1 = preconfig_per_testcase(TestCase, Config),
|
Config1 = preconfig_per_testcase(TestCase, Config),
|
||||||
case is_gc_tc(TestCase) of
|
|
||||||
true ->
|
|
||||||
ets:delete_all_objects(?config(msg_store, Config)),
|
|
||||||
ets:delete_all_objects(?config(session_msg_store, Config));
|
|
||||||
false ->
|
|
||||||
skip
|
|
||||||
end,
|
|
||||||
case erlang:function_exported(?MODULE, TestCase, 2) of
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||||
true -> ?MODULE:TestCase(init, Config1);
|
true -> ?MODULE:TestCase(init, Config1);
|
||||||
_ -> Config1
|
_ -> Config1
|
||||||
end.
|
end.
|
||||||
|
|
||||||
end_per_testcase(TestCase, Config) ->
|
end_per_testcase(TestCase, Config) ->
|
||||||
case is_snabbkaffe_tc(TestCase) of
|
|
||||||
true -> snabbkaffe:stop();
|
|
||||||
false -> skip
|
|
||||||
end,
|
|
||||||
case erlang:function_exported(?MODULE, TestCase, 2) of
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||||
true -> ?MODULE:TestCase('end', Config);
|
true -> ?MODULE:TestCase('end', Config);
|
||||||
false -> ok
|
false -> ok
|
||||||
|
@ -307,20 +197,6 @@ wait_for_cm_unregister(ClientId, N) ->
|
||||||
wait_for_cm_unregister(ClientId, N - 1)
|
wait_for_cm_unregister(ClientId, N - 1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
snabbkaffe_sync_publish(Topic, Payloads) ->
|
|
||||||
Fun = fun(Client, Payload) ->
|
|
||||||
?check_trace(
|
|
||||||
begin
|
|
||||||
?wait_async_action(
|
|
||||||
{ok, _} = emqtt:publish(Client, Topic, Payload, 2),
|
|
||||||
#{?snk_kind := ps_persist_msg, payload := Payload}
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
fun(_, _Trace) -> ok end
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
do_publish(Payloads, Fun, true).
|
|
||||||
|
|
||||||
publish(Topic, Payloads) ->
|
publish(Topic, Payloads) ->
|
||||||
publish(Topic, Payloads, false).
|
publish(Topic, Payloads, false).
|
||||||
|
|
||||||
|
@ -514,20 +390,6 @@ t_persist_on_disconnect(Config) ->
|
||||||
?assertEqual(0, client_info(session_present, Client2)),
|
?assertEqual(0, client_info(session_present, Client2)),
|
||||||
ok = emqtt:disconnect(Client2).
|
ok = emqtt:disconnect(Client2).
|
||||||
|
|
||||||
wait_for_pending(SId) ->
|
|
||||||
wait_for_pending(SId, 100).
|
|
||||||
|
|
||||||
wait_for_pending(_SId, 0) ->
|
|
||||||
error(exhausted_wait_for_pending);
|
|
||||||
wait_for_pending(SId, N) ->
|
|
||||||
case emqx_persistent_session:pending(SId) of
|
|
||||||
[] ->
|
|
||||||
timer:sleep(1),
|
|
||||||
wait_for_pending(SId, N - 1);
|
|
||||||
[_ | _] = Pending ->
|
|
||||||
Pending
|
|
||||||
end.
|
|
||||||
|
|
||||||
t_process_dies_session_expires(Config) ->
|
t_process_dies_session_expires(Config) ->
|
||||||
%% Emulate an error in the connect process,
|
%% Emulate an error in the connect process,
|
||||||
%% or that the node of the process goes down.
|
%% or that the node of the process goes down.
|
||||||
|
@ -552,36 +414,8 @@ t_process_dies_session_expires(Config) ->
|
||||||
|
|
||||||
ok = publish(Topic, [Payload]),
|
ok = publish(Topic, [Payload]),
|
||||||
|
|
||||||
SessionId =
|
|
||||||
case ?config(persistent_store_enabled, Config) of
|
|
||||||
false ->
|
|
||||||
undefined;
|
|
||||||
true ->
|
|
||||||
%% The session should not be marked as expired.
|
|
||||||
{Tag, Session} = emqx_persistent_session:lookup(ClientId),
|
|
||||||
?assertEqual(persistent, Tag),
|
|
||||||
SId = emqx_session:info(id, Session),
|
|
||||||
case ?config(kill_connection_process, Config) of
|
|
||||||
true ->
|
|
||||||
%% The session should have a pending message
|
|
||||||
?assertMatch([_], wait_for_pending(SId));
|
|
||||||
false ->
|
|
||||||
skip
|
|
||||||
end,
|
|
||||||
SId
|
|
||||||
end,
|
|
||||||
|
|
||||||
timer:sleep(1100),
|
timer:sleep(1100),
|
||||||
|
|
||||||
%% The session should now be marked as expired.
|
|
||||||
case
|
|
||||||
(?config(kill_connection_process, Config) andalso
|
|
||||||
?config(persistent_store_enabled, Config))
|
|
||||||
of
|
|
||||||
true -> ?assertMatch({expired, _}, emqx_persistent_session:lookup(ClientId));
|
|
||||||
false -> skip
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([
|
{ok, Client2} = emqtt:start_link([
|
||||||
{proto_ver, v5},
|
{proto_ver, v5},
|
||||||
{clientid, ClientId},
|
{clientid, ClientId},
|
||||||
|
@ -592,21 +426,6 @@ t_process_dies_session_expires(Config) ->
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
?assertEqual(0, client_info(session_present, Client2)),
|
?assertEqual(0, client_info(session_present, Client2)),
|
||||||
|
|
||||||
case
|
|
||||||
(?config(kill_connection_process, Config) andalso
|
|
||||||
?config(persistent_store_enabled, Config))
|
|
||||||
of
|
|
||||||
true ->
|
|
||||||
%% The session should be a fresh one
|
|
||||||
{persistent, NewSession} = emqx_persistent_session:lookup(ClientId),
|
|
||||||
?assertNotEqual(SessionId, emqx_session:info(id, NewSession)),
|
|
||||||
%% The old session should now either
|
|
||||||
%% be marked as abandoned or already be garbage collected.
|
|
||||||
?assertMatch([], emqx_persistent_session:pending(SessionId));
|
|
||||||
false ->
|
|
||||||
skip
|
|
||||||
end,
|
|
||||||
|
|
||||||
%% We should not receive the pending message
|
%% We should not receive the pending message
|
||||||
?assertEqual([], receive_messages(1)),
|
?assertEqual([], receive_messages(1)),
|
||||||
|
|
||||||
|
@ -724,7 +543,6 @@ t_clean_start_drops_subscriptions(Config) ->
|
||||||
|
|
||||||
t_unsubscribe(Config) ->
|
t_unsubscribe(Config) ->
|
||||||
ConnFun = ?config(conn_fun, Config),
|
ConnFun = ?config(conn_fun, Config),
|
||||||
Topic = ?config(topic, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
STopic = ?config(stopic, Config),
|
||||||
ClientId = ?config(client_id, Config),
|
ClientId = ?config(client_id, Config),
|
||||||
{ok, Client} = emqtt:start_link([
|
{ok, Client} = emqtt:start_link([
|
||||||
|
@ -735,22 +553,9 @@ t_unsubscribe(Config) ->
|
||||||
]),
|
]),
|
||||||
{ok, _} = emqtt:ConnFun(Client),
|
{ok, _} = emqtt:ConnFun(Client),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
|
||||||
case emqx_persistent_session:is_store_enabled() of
|
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
||||||
true ->
|
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
|
||||||
{persistent, Session} = emqx_persistent_session:lookup(ClientId),
|
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
||||||
SessionID = emqx_session:info(id, Session),
|
|
||||||
SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
|
|
||||||
?assert(lists:member(SessionID, SessionIDs)),
|
|
||||||
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
|
||||||
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
|
|
||||||
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
|
||||||
SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
|
|
||||||
?assert(not lists:member(SessionID, SessionIDs2));
|
|
||||||
false ->
|
|
||||||
?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
|
||||||
{ok, _, _} = emqtt:unsubscribe(Client, STopic),
|
|
||||||
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic])
|
|
||||||
end,
|
|
||||||
ok = emqtt:disconnect(Client).
|
ok = emqtt:disconnect(Client).
|
||||||
|
|
||||||
t_multiple_subscription_matches(Config) ->
|
t_multiple_subscription_matches(Config) ->
|
||||||
|
@ -794,515 +599,3 @@ t_multiple_subscription_matches(Config) ->
|
||||||
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
||||||
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
|
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
|
||||||
ok = emqtt:disconnect(Client2).
|
ok = emqtt:disconnect(Client2).
|
||||||
|
|
||||||
t_lost_messages_because_of_gc(init, Config) ->
|
|
||||||
case
|
|
||||||
(emqx_persistent_session:is_store_enabled() andalso
|
|
||||||
?config(kill_connection_process, Config))
|
|
||||||
of
|
|
||||||
true ->
|
|
||||||
Retain = 1000,
|
|
||||||
OldRetain = emqx_config:get(?msg_retain, Retain),
|
|
||||||
emqx_config:put(?msg_retain, Retain),
|
|
||||||
[{retain, Retain}, {old_retain, OldRetain} | Config];
|
|
||||||
false ->
|
|
||||||
{skip, only_relevant_with_store_and_kill_process}
|
|
||||||
end;
|
|
||||||
t_lost_messages_because_of_gc('end', Config) ->
|
|
||||||
OldRetain = ?config(old_retain, Config),
|
|
||||||
emqx_config:put(?msg_retain, OldRetain),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_lost_messages_because_of_gc(Config) ->
|
|
||||||
ConnFun = ?config(conn_fun, Config),
|
|
||||||
Topic = ?config(topic, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
ClientId = ?config(client_id, Config),
|
|
||||||
Retain = ?config(retain, Config),
|
|
||||||
Payload1 = <<"hello1">>,
|
|
||||||
Payload2 = <<"hello2">>,
|
|
||||||
{ok, Client1} = emqtt:start_link([
|
|
||||||
{clientid, ClientId},
|
|
||||||
{proto_ver, v5},
|
|
||||||
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
||||||
| Config
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client1),
|
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
||||||
emqtt:disconnect(Client1),
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
|
||||||
publish(Topic, Payload1),
|
|
||||||
timer:sleep(2 * Retain),
|
|
||||||
publish(Topic, Payload2),
|
|
||||||
emqx_persistent_session_gc:message_gc_worker(),
|
|
||||||
{ok, Client2} = emqtt:start_link([
|
|
||||||
{clientid, ClientId},
|
|
||||||
{clean_start, false},
|
|
||||||
{proto_ver, v5},
|
|
||||||
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
||||||
| Config
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
|
||||||
Msgs = receive_messages(2),
|
|
||||||
?assertMatch([_], Msgs),
|
|
||||||
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))),
|
|
||||||
emqtt:disconnect(Client2),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Snabbkaffe helpers
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
check_snabbkaffe_vanilla(Trace) ->
|
|
||||||
ResumeTrace = [
|
|
||||||
T
|
|
||||||
|| #{?snk_kind := K} = T <- Trace,
|
|
||||||
re:run(to_list(K), "^ps_") /= nomatch
|
|
||||||
],
|
|
||||||
?assertMatch([_ | _], ResumeTrace),
|
|
||||||
[_Sid] = lists:usort(?projection(sid, ResumeTrace)),
|
|
||||||
%% Check internal flow of the emqx_cm resuming
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_resuming},
|
|
||||||
#{?snk_kind := ps_initial_pendings},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_initial_pendings},
|
|
||||||
#{?snk_kind := ps_persist_pendings},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_persist_pendings},
|
|
||||||
#{?snk_kind := ps_notify_writers},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_notify_writers},
|
|
||||||
#{?snk_kind := ps_node_markers},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_node_markers},
|
|
||||||
#{?snk_kind := ps_resume_session},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_resume_session},
|
|
||||||
#{?snk_kind := ps_marker_pendings},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_marker_pendings},
|
|
||||||
#{?snk_kind := ps_marker_pendings_msgs},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_marker_pendings_msgs},
|
|
||||||
#{?snk_kind := ps_resume_end},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
|
|
||||||
%% Check flow between worker and emqx_cm
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_notify_writers},
|
|
||||||
#{?snk_kind := ps_worker_started},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_marker_pendings},
|
|
||||||
#{?snk_kind := ps_worker_resume_end},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
?assert(
|
|
||||||
?strict_causality(
|
|
||||||
#{?snk_kind := ps_worker_resume_end},
|
|
||||||
#{?snk_kind := ps_worker_shutdown},
|
|
||||||
ResumeTrace
|
|
||||||
)
|
|
||||||
),
|
|
||||||
|
|
||||||
[Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)),
|
|
||||||
?assertMatch([_], Markers).
|
|
||||||
|
|
||||||
to_list(L) when is_list(L) -> L;
|
|
||||||
to_list(A) when is_atom(A) -> atom_to_list(A);
|
|
||||||
to_list(B) when is_binary(B) -> binary_to_list(B).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Snabbkaffe tests
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_snabbkaffe_vanilla_stages(Config) ->
|
|
||||||
%% Test that all stages of session resume works ok in the simplest case
|
|
||||||
ConnFun = ?config(conn_fun, Config),
|
|
||||||
ClientId = ?config(client_id, Config),
|
|
||||||
EmqttOpts = [
|
|
||||||
{proto_ver, v5},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
||||||
| Config
|
|
||||||
],
|
|
||||||
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client1),
|
|
||||||
ok = emqtt:disconnect(Client1),
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
|
||||||
|
|
||||||
?check_trace(
|
|
||||||
begin
|
|
||||||
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
|
||||||
ok = emqtt:disconnect(Client2)
|
|
||||||
end,
|
|
||||||
fun(ok, Trace) ->
|
|
||||||
check_snabbkaffe_vanilla(Trace)
|
|
||||||
end
|
|
||||||
),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_snabbkaffe_pending_messages(Config) ->
|
|
||||||
%% Make sure there are pending messages are fetched during the init stage.
|
|
||||||
ConnFun = ?config(conn_fun, Config),
|
|
||||||
ClientId = ?config(client_id, Config),
|
|
||||||
Topic = ?config(topic, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3, 4, 5]],
|
|
||||||
EmqttOpts = [
|
|
||||||
{proto_ver, v5},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
||||||
| Config
|
|
||||||
],
|
|
||||||
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client1),
|
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
||||||
ok = emqtt:disconnect(Client1),
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
|
||||||
|
|
||||||
?check_trace(
|
|
||||||
begin
|
|
||||||
snabbkaffe_sync_publish(Topic, Payloads),
|
|
||||||
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
|
||||||
Msgs = receive_messages(length(Payloads)),
|
|
||||||
ReceivedPayloads = [P || #{payload := P} <- Msgs],
|
|
||||||
?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)),
|
|
||||||
ok = emqtt:disconnect(Client2)
|
|
||||||
end,
|
|
||||||
fun(ok, Trace) ->
|
|
||||||
check_snabbkaffe_vanilla(Trace),
|
|
||||||
%% Check that all messages was delivered from the DB
|
|
||||||
[Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)),
|
|
||||||
[Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)),
|
|
||||||
Delivers = Delivers1 ++ Delivers2,
|
|
||||||
?assertEqual(length(Payloads), length(Delivers)),
|
|
||||||
%% Check for no duplicates
|
|
||||||
?assertEqual(lists:usort(Delivers), lists:sort(Delivers))
|
|
||||||
end
|
|
||||||
),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_snabbkaffe_buffered_messages(Config) ->
|
|
||||||
%% Make sure to buffer messages during startup.
|
|
||||||
ConnFun = ?config(conn_fun, Config),
|
|
||||||
ClientId = ?config(client_id, Config),
|
|
||||||
Topic = ?config(topic, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]],
|
|
||||||
Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]],
|
|
||||||
EmqttOpts = [
|
|
||||||
{proto_ver, v5},
|
|
||||||
{clientid, ClientId},
|
|
||||||
{properties, #{'Session-Expiry-Interval' => 30}}
|
|
||||||
| Config
|
|
||||||
],
|
|
||||||
{ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client1),
|
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
|
||||||
ok = emqtt:disconnect(Client1),
|
|
||||||
maybe_kill_connection_process(ClientId, Config),
|
|
||||||
|
|
||||||
publish(Topic, Payloads1),
|
|
||||||
|
|
||||||
?check_trace(
|
|
||||||
begin
|
|
||||||
%% Make the resume init phase wait until the first message is delivered.
|
|
||||||
?force_ordering(
|
|
||||||
#{?snk_kind := ps_worker_deliver},
|
|
||||||
#{?snk_kind := ps_resume_end}
|
|
||||||
),
|
|
||||||
Parent = self(),
|
|
||||||
spawn_link(fun() ->
|
|
||||||
?block_until(#{?snk_kind := ps_marker_pendings_msgs}, infinity, 5000),
|
|
||||||
publish(Topic, Payloads2, true),
|
|
||||||
Parent ! publish_done,
|
|
||||||
ok
|
|
||||||
end),
|
|
||||||
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
|
||||||
receive
|
|
||||||
publish_done -> ok
|
|
||||||
after 10000 -> error(too_long_to_publish)
|
|
||||||
end,
|
|
||||||
Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1),
|
|
||||||
ReceivedPayloads = [P || #{payload := P} <- Msgs],
|
|
||||||
?assertEqual(
|
|
||||||
lists:sort(Payloads1 ++ Payloads2),
|
|
||||||
lists:sort(ReceivedPayloads)
|
|
||||||
),
|
|
||||||
ok = emqtt:disconnect(Client2)
|
|
||||||
end,
|
|
||||||
fun(ok, Trace) ->
|
|
||||||
check_snabbkaffe_vanilla(Trace),
|
|
||||||
%% Check that some messages was buffered in the writer process
|
|
||||||
[Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)),
|
|
||||||
?assertMatch(
|
|
||||||
X when 0 < X andalso X =< length(Payloads2),
|
|
||||||
length(Msgs)
|
|
||||||
)
|
|
||||||
end
|
|
||||||
),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% GC tests
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(MARKER, 3).
|
|
||||||
-define(DELIVERED, 2).
|
|
||||||
-define(UNDELIVERED, 1).
|
|
||||||
-define(ABANDONED, 0).
|
|
||||||
|
|
||||||
msg_id() ->
|
|
||||||
emqx_guid:gen().
|
|
||||||
|
|
||||||
delivered_msg(MsgId, SessionID, STopic) ->
|
|
||||||
{SessionID, MsgId, STopic, ?DELIVERED}.
|
|
||||||
|
|
||||||
undelivered_msg(MsgId, SessionID, STopic) ->
|
|
||||||
{SessionID, MsgId, STopic, ?UNDELIVERED}.
|
|
||||||
|
|
||||||
marker_msg(MarkerID, SessionID) ->
|
|
||||||
{SessionID, MarkerID, <<>>, ?MARKER}.
|
|
||||||
|
|
||||||
guid(MicrosecondsAgo) ->
|
|
||||||
%% Make a fake GUID and set a timestamp.
|
|
||||||
<<TS:64, Tail:64>> = emqx_guid:gen(),
|
|
||||||
<<(TS - MicrosecondsAgo):64, Tail:64>>.
|
|
||||||
|
|
||||||
abandoned_session_msg(SessionID) ->
|
|
||||||
abandoned_session_msg(SessionID, 0).
|
|
||||||
|
|
||||||
abandoned_session_msg(SessionID, MicrosecondsAgo) ->
|
|
||||||
TS = erlang:system_time(microsecond),
|
|
||||||
{SessionID, <<>>, <<(TS - MicrosecondsAgo):64>>, ?ABANDONED}.
|
|
||||||
|
|
||||||
fresh_gc_delete_fun() ->
|
|
||||||
Ets = ets:new(gc_collect, [ordered_set]),
|
|
||||||
fun
|
|
||||||
(delete, Key) ->
|
|
||||||
ets:insert(Ets, {Key}),
|
|
||||||
ok;
|
|
||||||
(collect, <<>>) ->
|
|
||||||
List = ets:match(Ets, {'$1'}),
|
|
||||||
ets:delete(Ets),
|
|
||||||
lists:append(List);
|
|
||||||
(_, _Key) ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
fresh_gc_callbacks_fun() ->
|
|
||||||
Ets = ets:new(gc_collect, [ordered_set]),
|
|
||||||
fun
|
|
||||||
(collect, <<>>) ->
|
|
||||||
List = ets:match(Ets, {'$1'}),
|
|
||||||
ets:delete(Ets),
|
|
||||||
lists:append(List);
|
|
||||||
(Tag, Key) ->
|
|
||||||
ets:insert(Ets, {{Key, Tag}}),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_gc_delete_messages() ->
|
|
||||||
Fun = fresh_gc_delete_fun(),
|
|
||||||
emqx_persistent_session:gc_session_messages(Fun),
|
|
||||||
Fun(collect, <<>>).
|
|
||||||
|
|
||||||
get_gc_callbacks() ->
|
|
||||||
Fun = fresh_gc_callbacks_fun(),
|
|
||||||
emqx_persistent_session:gc_session_messages(Fun),
|
|
||||||
Fun(collect, <<>>).
|
|
||||||
|
|
||||||
t_gc_all_delivered(Config) ->
|
|
||||||
Store = ?config(session_msg_store, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
SessionId = emqx_guid:gen(),
|
|
||||||
MsgIds = [msg_id() || _ <- lists:seq(1, 5)],
|
|
||||||
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
SortedContent = lists:usort(Delivered ++ Undelivered),
|
|
||||||
ets:insert(Store, [{X, <<>>} || X <- SortedContent]),
|
|
||||||
GCMessages = get_gc_delete_messages(),
|
|
||||||
?assertEqual(SortedContent, GCMessages),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_some_undelivered(Config) ->
|
|
||||||
Store = ?config(session_msg_store, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
SessionId = emqx_guid:gen(),
|
|
||||||
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
|
|
||||||
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
{Delivered1, _Delivered2} = split(Delivered),
|
|
||||||
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
{Undelivered1, Undelivered2} = split(Undelivered),
|
|
||||||
Content = Delivered1 ++ Undelivered1 ++ Undelivered2,
|
|
||||||
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
||||||
Expected = lists:usort(Delivered1 ++ Undelivered1),
|
|
||||||
GCMessages = get_gc_delete_messages(),
|
|
||||||
?assertEqual(Expected, GCMessages),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_with_markers(Config) ->
|
|
||||||
Store = ?config(session_msg_store, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
SessionId = emqx_guid:gen(),
|
|
||||||
MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)],
|
|
||||||
MarkerId = msg_id(),
|
|
||||||
MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1,
|
|
||||||
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
{Delivered1, _Delivered2} = split(Delivered),
|
|
||||||
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
{Undelivered1, Undelivered2} = split(Undelivered),
|
|
||||||
Markers = [marker_msg(MarkerId, SessionId)],
|
|
||||||
Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers,
|
|
||||||
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
||||||
Expected = lists:usort(Delivered1 ++ Undelivered1),
|
|
||||||
GCMessages = get_gc_delete_messages(),
|
|
||||||
?assertEqual(Expected, GCMessages),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_abandoned_some_undelivered(Config) ->
|
|
||||||
Store = ?config(session_msg_store, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
SessionId = emqx_guid:gen(),
|
|
||||||
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
|
|
||||||
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
{Delivered1, _Delivered2} = split(Delivered),
|
|
||||||
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
{Undelivered1, Undelivered2} = split(Undelivered),
|
|
||||||
Abandoned = abandoned_session_msg(SessionId),
|
|
||||||
Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned],
|
|
||||||
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
||||||
Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2),
|
|
||||||
GCMessages = get_gc_delete_messages(),
|
|
||||||
?assertEqual(Expected, GCMessages),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_abandoned_only_called_on_empty_session(Config) ->
|
|
||||||
Store = ?config(session_msg_store, Config),
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
SessionId = emqx_guid:gen(),
|
|
||||||
MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
|
|
||||||
Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
|
|
||||||
Abandoned = abandoned_session_msg(SessionId),
|
|
||||||
Content = Delivered ++ Undelivered ++ [Abandoned],
|
|
||||||
ets:insert(Store, [{X, <<>>} || X <- Content]),
|
|
||||||
GCMessages = get_gc_callbacks(),
|
|
||||||
|
|
||||||
%% Since we had messages to delete, we don't expect to get the
|
|
||||||
%% callback on the abandoned session
|
|
||||||
?assertEqual([], [X || {X, abandoned} <- GCMessages]),
|
|
||||||
|
|
||||||
%% But if we have only the abandoned session marker for this
|
|
||||||
%% session, it should be called.
|
|
||||||
ets:delete_all_objects(Store),
|
|
||||||
UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>),
|
|
||||||
ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]),
|
|
||||||
GCMessages2 = get_gc_callbacks(),
|
|
||||||
?assertEqual([Abandoned], [X || {X, abandoned} <- GCMessages2]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_session_gc_worker(init, Config) ->
|
|
||||||
meck:new(emqx_persistent_session, [passthrough, no_link]),
|
|
||||||
Config;
|
|
||||||
t_gc_session_gc_worker('end', _Config) ->
|
|
||||||
meck:unload(emqx_persistent_session),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_session_gc_worker(Config) ->
|
|
||||||
STopic = ?config(stopic, Config),
|
|
||||||
SessionID = emqx_guid:gen(),
|
|
||||||
MsgDeleted = delivered_msg(msg_id(), SessionID, STopic),
|
|
||||||
MarkerNotDeleted = marker_msg(msg_id(), SessionID),
|
|
||||||
MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID),
|
|
||||||
AbandonedNotDeleted = abandoned_session_msg(SessionID),
|
|
||||||
AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000),
|
|
||||||
meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end),
|
|
||||||
emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted),
|
|
||||||
emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted),
|
|
||||||
emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted),
|
|
||||||
emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted),
|
|
||||||
emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted),
|
|
||||||
History = meck:history(emqx_persistent_session, self()),
|
|
||||||
DeleteCalls = [
|
|
||||||
Key
|
|
||||||
|| {_Pid, {_, delete_session_message, [Key]}, _Result} <-
|
|
||||||
History
|
|
||||||
],
|
|
||||||
?assertEqual(
|
|
||||||
lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]),
|
|
||||||
lists:sort(DeleteCalls)
|
|
||||||
),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_gc_message_gc(Config) ->
|
|
||||||
Topic = ?config(topic, Config),
|
|
||||||
ClientID = ?config(client_id, Config),
|
|
||||||
Store = ?config(msg_store, Config),
|
|
||||||
NewMsgs = [
|
|
||||||
emqx_message:make(ClientID, Topic, integer_to_binary(P))
|
|
||||||
|| P <- lists:seq(6, 10)
|
|
||||||
],
|
|
||||||
Retain = 60 * 1000,
|
|
||||||
emqx_config:put(?msg_retain, Retain),
|
|
||||||
Msgs1 = [
|
|
||||||
emqx_message:make(ClientID, Topic, integer_to_binary(P))
|
|
||||||
|| P <- lists:seq(1, 5)
|
|
||||||
],
|
|
||||||
OldMsgs = [M#message{id = guid(Retain * 1000)} || M <- Msgs1],
|
|
||||||
ets:insert(Store, NewMsgs ++ OldMsgs),
|
|
||||||
?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)),
|
|
||||||
ok = emqx_persistent_session_gc:message_gc_worker(),
|
|
||||||
?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
split(List) ->
|
|
||||||
split(List, [], []).
|
|
||||||
|
|
||||||
split([], L1, L2) ->
|
|
||||||
{L1, L2};
|
|
||||||
split([H], L1, L2) ->
|
|
||||||
{[H | L1], L2};
|
|
||||||
split([H1, H2 | Left], L1, L2) ->
|
|
||||||
split(Left, [H1 | L1], [H2 | L2]).
|
|
||||||
|
|
|
@ -29,16 +29,6 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]).
|
emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]).
|
||||||
|
|
||||||
init_per_testcase(t_persistence, _Config) ->
|
|
||||||
{skip, "Existing session persistence implementation is being phased out"};
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_testcase(t_persistence, Config) ->
|
|
||||||
Config;
|
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Tests
|
%% Tests
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -199,40 +189,6 @@ t_get_connected_client_count(_Config) ->
|
||||||
emqx_cm:get_connected_client_count()
|
emqx_cm:get_connected_client_count()
|
||||||
).
|
).
|
||||||
|
|
||||||
t_persistence(_Config) ->
|
|
||||||
erlang:process_flag(trap_exit, true),
|
|
||||||
|
|
||||||
Topic = <<"t1">>,
|
|
||||||
Message = <<"message_to_persist">>,
|
|
||||||
|
|
||||||
{ok, C0} = emqtt_connect(?CLIENT_ID, false),
|
|
||||||
{ok, _, _} = emqtt:subscribe(C0, Topic, 0),
|
|
||||||
|
|
||||||
Opts = evict_session_opts(?CLIENT_ID),
|
|
||||||
{ok, Pid} = emqx_eviction_agent_channel:start_supervised(Opts),
|
|
||||||
|
|
||||||
{ok, C1} = emqtt_connect(),
|
|
||||||
{ok, _} = emqtt:publish(C1, Topic, Message, 1),
|
|
||||||
ok = emqtt:disconnect(C1),
|
|
||||||
|
|
||||||
%% Kill channel so that the session is only persisted
|
|
||||||
ok = emqx_eviction_agent_channel:call(Pid, kick),
|
|
||||||
|
|
||||||
%% Should restore session from persistents storage and receive messages
|
|
||||||
{ok, C2} = emqtt_connect(?CLIENT_ID, false),
|
|
||||||
|
|
||||||
receive
|
|
||||||
{publish, #{
|
|
||||||
payload := Message,
|
|
||||||
topic := Topic
|
|
||||||
}} ->
|
|
||||||
ok
|
|
||||||
after 1000 ->
|
|
||||||
ct:fail("message not received")
|
|
||||||
end,
|
|
||||||
|
|
||||||
ok = emqtt:disconnect(C2).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue