diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 876fe66e0..12fa9625e 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -46,7 +46,6 @@ {emqx_node_rebalance_purge,1}. {emqx_node_rebalance_status,1}. {emqx_node_rebalance_status,2}. -{emqx_persistent_session,1}. {emqx_persistent_session_ds,1}. {emqx_plugins,1}. {emqx_prometheus,1}. diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 0f4987085..5f2605707 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -38,7 +38,6 @@ start(_Type, _Args) -> ok = maybe_load_config(), - ok = emqx_persistent_session:init_db_backend(), _ = emqx_persistent_session_ds:init(), ok = maybe_start_quicer(), ok = emqx_bpapi:start(), diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl deleted file mode 100644 index 25484bdf0..000000000 --- a/apps/emqx/src/emqx_session_router.erl +++ /dev/null @@ -1,306 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_session_router). - --behaviour(gen_server). - --include("emqx.hrl"). --include("logger.hrl"). --include("types.hrl"). --include("persistent_session/emqx_persistent_session.hrl"). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - --export([ - create_init_tab/0, - create_router_tab/1, - start_link/2 -]). - -%% Route APIs --export([ - delete_routes/2, - do_add_route/2, - do_delete_route/2, - match_routes/1 -]). - --export([ - buffer/3, - pending/2, - resume_begin/2, - resume_end/2 -]). - --export([print_routes/1]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --type dest() :: node() | {emqx_types:group(), node()}. - --define(ROUTE_RAM_TAB, emqx_session_route_ram). --define(ROUTE_DISC_TAB, emqx_session_route_disc). - --define(SESSION_INIT_TAB, session_init_tab). - -%%-------------------------------------------------------------------- -%% Mnesia bootstrap -%%-------------------------------------------------------------------- - -create_router_tab(disc) -> - create_table(?ROUTE_DISC_TAB, disc_copies); -create_router_tab(ram) -> - create_table(?ROUTE_RAM_TAB, ram_copies). - -create_table(Tab, Storage) -> - ok = mria:create_table(Tab, [ - {type, bag}, - {rlog_shard, ?ROUTE_SHARD}, - {storage, Storage}, - {record_name, route}, - {attributes, record_info(fields, route)}, - {storage_properties, [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]} - ]). - -%%-------------------------------------------------------------------- -%% Start a router -%%-------------------------------------------------------------------- - -create_init_tab() -> - emqx_utils_ets:new(?SESSION_INIT_TAB, [ - public, - {read_concurrency, true}, - {write_concurrency, true} - ]). - --spec start_link(atom(), pos_integer()) -> startlink_ret(). -start_link(Pool, Id) -> - gen_server:start_link( - {local, emqx_utils:proc_name(?MODULE, Id)}, - ?MODULE, - [Pool, Id], - [{hibernate_after, 1000}] - ). - -%%-------------------------------------------------------------------- -%% Route APIs -%%-------------------------------------------------------------------- - --spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. -do_add_route(Topic, SessionID) when is_binary(Topic) -> - Route = #route{topic = Topic, dest = SessionID}, - case lists:member(Route, lookup_routes(Topic)) of - true -> - ok; - false -> - case emqx_topic:wildcard(Topic) of - true -> - Fun = fun emqx_router_utils:insert_session_trie_route/2, - emqx_router_utils:maybe_trans( - Fun, - [route_tab(), Route], - ?PERSISTENT_SESSION_SHARD - ); - false -> - emqx_router_utils:insert_direct_route(route_tab(), Route) - end - end. - -%% @doc Match routes --spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. -match_routes(Topic) when is_binary(Topic) -> - case match_trie(Topic) of - [] -> lookup_routes(Topic); - Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]]) - end. - -%% Optimize: routing table will be replicated to all router nodes. -match_trie(Topic) -> - case emqx_trie:empty_session() of - true -> []; - false -> emqx_trie:match_session(Topic) - end. - -%% Async -delete_routes(SessionID, Subscriptions) -> - cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}). - --spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. -do_delete_route(Topic, SessionID) -> - Route = #route{topic = Topic, dest = SessionID}, - case emqx_topic:wildcard(Topic) of - true -> - Fun = fun emqx_router_utils:delete_session_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD); - false -> - emqx_router_utils:delete_direct_route(route_tab(), Route) - end. - -%% @doc Print routes to a topic --spec print_routes(emqx_types:topic()) -> ok. -print_routes(Topic) -> - lists:foreach( - fun(#route{topic = To, dest = SessionID}) -> - io:format("~s -> ~p~n", [To, SessionID]) - end, - match_routes(Topic) - ). - -%%-------------------------------------------------------------------- -%% Session APIs -%%-------------------------------------------------------------------- - -pending(SessionID, MarkerIDs) -> - call(pick(SessionID), {pending, SessionID, MarkerIDs}). - -buffer(SessionID, STopic, Msg) -> - case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of - undefined -> ok; - Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg) - end. - --spec resume_begin(pid(), binary()) -> [{node(), emqx_guid:guid()}]. -resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) -> - call(pick(SessionID), {resume_begin, From, SessionID}). - --spec resume_end(pid(), binary()) -> - {'ok', [emqx_types:message()]} | {'error', term()}. -resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) -> - case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of - undefined -> - ?tp(ps_session_not_found, #{sid => SessionID}), - {error, not_found}; - Pid -> - Res = emqx_session_router_worker:resume_end(From, Pid, SessionID), - cast(pick(SessionID), {resume_end, SessionID, Pid}), - Res - end. - -%%-------------------------------------------------------------------- -%% Worker internals -%%-------------------------------------------------------------------- - -call(Router, Msg) -> - gen_server:call(Router, Msg, infinity). - -cast(Router, Msg) -> - gen_server:cast(Router, Msg). - -pick(#route{dest = SessionID}) -> - gproc_pool:pick_worker(session_router_pool, SessionID); -pick(SessionID) when is_binary(SessionID) -> - gproc_pool:pick_worker(session_router_pool, SessionID). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id]) -> - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #{pool => Pool, id => Id, pmon => emqx_pmon:new()}}. - -handle_call({resume_begin, RemotePid, SessionID}, _From, State) -> - case init_resume_worker(RemotePid, SessionID, State) of - error -> - {reply, error, State}; - {ok, Pid, State1} -> - ets:insert(?SESSION_INIT_TAB, {SessionID, Pid}), - MarkerID = emqx_persistent_session:mark_resume_begin(SessionID), - {reply, {ok, MarkerID}, State1} - end; -handle_call({pending, SessionID, MarkerIDs}, _From, State) -> - Res = emqx_persistent_session:pending_messages_in_db(SessionID, MarkerIDs), - {reply, Res, State}; -handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", req => Req}), - {reply, ignored, State}. - -handle_cast({delete_routes, SessionID, Subscriptions}, State) -> - %% TODO: Make a batch for deleting all routes. - Fun = fun(Topic, _) -> do_delete_route(Topic, SessionID) end, - ok = maps:foreach(Fun, Subscriptions), - {noreply, State}; -handle_cast({resume_end, SessionID, Pid}, State) -> - case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of - undefined -> skip; - P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID); - P when is_pid(P) -> skip - end, - Pmon = emqx_pmon:demonitor(Pid, maps:get(pmon, State)), - _ = emqx_session_router_worker_sup:abort_worker(Pid), - {noreply, State#{pmon => Pmon}}; -handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), - {noreply, State}. - -handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", info => Info}), - {noreply, State}. - -terminate(_Reason, #{pool := Pool, id := Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Resume worker. A process that buffers the persisted messages during -%% initialisation of a resuming session. -%%-------------------------------------------------------------------- - -init_resume_worker(RemotePid, SessionID, #{pmon := Pmon} = State) -> - case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of - {error, What} -> - ?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}), - error; - {ok, Pid} -> - Pmon1 = emqx_pmon:monitor(Pid, Pmon), - case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of - undefined -> - {ok, Pid, State#{pmon => Pmon1}}; - {_, OldPid} -> - Pmon2 = emqx_pmon:demonitor(OldPid, Pmon1), - _ = emqx_session_router_worker_sup:abort_worker(OldPid), - {ok, Pid, State#{pmon => Pmon2}} - end - end. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -lookup_routes(Topic) -> - ets:lookup(route_tab(), Topic). - -route_tab() -> - case emqx_persistent_session:storage_type() of - disc -> ?ROUTE_DISC_TAB; - ram -> ?ROUTE_RAM_TAB - end. diff --git a/apps/emqx/src/emqx_sup.erl b/apps/emqx/src/emqx_sup.erl index 1893dba86..65742a234 100644 --- a/apps/emqx/src/emqx_sup.erl +++ b/apps/emqx/src/emqx_sup.erl @@ -67,13 +67,11 @@ init([]) -> KernelSup = child_spec(emqx_kernel_sup, supervisor), RouterSup = child_spec(emqx_router_sup, supervisor), BrokerSup = child_spec(emqx_broker_sup, supervisor), - SessionSup = child_spec(emqx_persistent_session_sup, supervisor), CMSup = child_spec(emqx_cm_sup, supervisor), SysSup = child_spec(emqx_sys_sup, supervisor), Limiter = child_spec(emqx_limiter_sup, supervisor), Children = [KernelSup] ++ - [SessionSup || emqx_persistent_session:is_store_enabled()] ++ [RouterSup || emqx_boot:is_enabled(broker)] ++ [BrokerSup || emqx_boot:is_enabled(broker)] ++ [CMSup || emqx_boot:is_enabled(broker)] ++ diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl deleted file mode 100644 index d85e13d67..000000000 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ /dev/null @@ -1,562 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session). - --export([ - is_store_enabled/0, - init_db_backend/0, - storage_backend/0, - storage_type/0 -]). - --export([ - discard/2, - discard_if_present/1, - lookup/1, - persist/3, - persist_message/1, - pending/1, - pending/2, - resume/3 -]). - --export([ - add_subscription/3, - remove_subscription/3 -]). - --export([ - mark_as_delivered/2, - mark_resume_begin/1 -]). - --export([ - pending_messages_in_db/2, - delete_session_message/1, - gc_session_messages/1, - session_message_info/2 -]). - --export([ - delete_message/1, - first_message_id/0, - next_message_id/1 -]). - --export_type([sess_msg_key/0]). - --include("emqx.hrl"). --include("emqx_channel.hrl"). --include("emqx_persistent_session.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - --compile({inline, [is_store_enabled/0]}). - -%% NOTE: Order is significant because of traversal order of the table. --define(MARKER, 3). --define(DELIVERED, 2). --define(UNDELIVERED, 1). --define(ABANDONED, 0). - --type bin_timestamp() :: <<_:64>>. --opaque sess_msg_key() :: - {emqx_guid:guid(), emqx_guid:guid(), emqx_types:topic(), ?UNDELIVERED | ?DELIVERED} - | {emqx_guid:guid(), emqx_guid:guid(), <<>>, ?MARKER} - | {emqx_guid:guid(), <<>>, bin_timestamp(), ?ABANDONED}. - --type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok'). - -%% EMQX configuration keys --define(conf_storage_backend, [persistent_session_store, backend, type]). - -%%-------------------------------------------------------------------- -%% Init -%%-------------------------------------------------------------------- - -init_db_backend() -> - case is_store_enabled() of - true -> - StorageType = storage_type(), - ok = emqx_trie:create_session_trie(StorageType), - ok = emqx_session_router:create_router_tab(StorageType), - case storage_backend() of - builtin -> - emqx_persistent_session_backend_builtin:create_tables(), - persistent_term:put(?db_backend_key, emqx_persistent_session_backend_builtin) - end, - ok; - false -> - persistent_term:put(?db_backend_key, emqx_persistent_session_backend_dummy), - ok - end. - -is_store_enabled() -> - emqx_config:get(?is_enabled_key). - --spec storage_backend() -> builtin. -storage_backend() -> - emqx_config:get(?conf_storage_backend). - -%%-------------------------------------------------------------------- -%% Session message ADT API -%%-------------------------------------------------------------------- - --spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term(). -session_message_info(timestamp, {_, <<>>, <>, ?ABANDONED}) -> TS; -session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID); -session_message_info(session_id, {SessionID, _, _, _}) -> SessionID. - -%%-------------------------------------------------------------------- -%% DB API -%%-------------------------------------------------------------------- - -first_message_id() -> - ?db_backend:first_message_id(). - -next_message_id(Key) -> - ?db_backend:next_message_id(Key). - -delete_message(Key) -> - ?db_backend:delete_message(Key). - -first_session_message() -> - ?db_backend:first_session_message(). - -next_session_message(Key) -> - ?db_backend:next_session_message(Key). - -delete_session_message(Key) -> - ?db_backend:delete_session_message(Key). - -put_session_store(#session_store{} = SS) -> - ?db_backend:put_session_store(SS). - -delete_session_store(ClientID) -> - ?db_backend:delete_session_store(ClientID). - -lookup_session_store(ClientID) -> - ?db_backend:lookup_session_store(ClientID). - -put_session_message({_, _, _, _} = Key) -> - ?db_backend:put_session_message(#session_msg{key = Key}). - -put_message(Msg) -> - ?db_backend:put_message(Msg). - -get_message(MsgId) -> - ?db_backend:get_message(MsgId). - -pending_messages_in_db(SessionID, MarkerIds) -> - ?db_backend:ro_transaction(pending_messages_fun(SessionID, MarkerIds)). - -%%-------------------------------------------------------------------- -%% Session API -%%-------------------------------------------------------------------- - -%% The timestamp (TS) is the last time a client interacted with the session, -%% or when the client disconnected. --spec persist( - emqx_types:clientinfo(), - emqx_types:conninfo(), - emqx_session:session() -) -> emqx_session:session(). - -persist(#{clientid := ClientID}, ConnInfo, Session) -> - case ClientID == undefined orelse not emqx_session:info(is_persistent, Session) of - true -> - Session; - false -> - SS = #session_store{ - client_id = ClientID, - expiry_interval = maps:get(expiry_interval, ConnInfo), - ts = timestamp_from_conninfo(ConnInfo), - session = Session - }, - case persistent_session_status(SS) of - not_persistent -> - Session; - expired -> - discard(ClientID, Session); - persistent -> - put_session_store(SS), - Session - end - end. - -timestamp_from_conninfo(ConnInfo) -> - case maps:get(disconnected_at, ConnInfo, undefined) of - undefined -> erlang:system_time(millisecond); - Disconnect -> Disconnect - end. - -lookup(ClientID) when is_binary(ClientID) -> - case is_store_enabled() of - false -> - none; - true -> - case lookup_session_store(ClientID) of - none -> - none; - {value, #session_store{session = S} = SS} -> - case persistent_session_status(SS) of - expired -> {expired, S}; - persistent -> {persistent, S} - end - end - end. - --spec discard_if_present(binary()) -> 'ok'. -discard_if_present(ClientID) -> - case lookup(ClientID) of - none -> - ok; - {Tag, Session} when Tag =:= persistent; Tag =:= expired -> - _ = discard(ClientID, Session), - ok - end. - --spec discard(binary(), emqx_session:session()) -> emqx_session:session(). -discard(ClientID, Session) -> - discard_opt(is_store_enabled(), ClientID, Session). - -discard_opt(false, _ClientID, Session) -> - emqx_session:set_field(is_persistent, false, Session); -discard_opt(true, ClientID, Session) -> - delete_session_store(ClientID), - SessionID = emqx_session:info(id, Session), - put_session_message({SessionID, <<>>, <<(erlang:system_time(microsecond)):64>>, ?ABANDONED}), - Subscriptions = emqx_session:info(subscriptions, Session), - emqx_session_router:delete_routes(SessionID, Subscriptions), - emqx_session:set_field(is_persistent, false, Session). - --spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid(). -mark_resume_begin(SessionID) -> - MarkerID = emqx_guid:gen(), - put_session_message({SessionID, MarkerID, <<>>, ?MARKER}), - MarkerID. - -add_subscription(TopicFilter, SessionID, true = _IsPersistent) -> - case is_store_enabled() of - true -> emqx_session_router:do_add_route(TopicFilter, SessionID); - false -> ok - end; -add_subscription(_TopicFilter, _SessionID, false = _IsPersistent) -> - ok. - -remove_subscription(TopicFilter, SessionID, true = _IsPersistent) -> - case is_store_enabled() of - true -> emqx_session_router:do_delete_route(TopicFilter, SessionID); - false -> ok - end; -remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) -> - ok. - -%%-------------------------------------------------------------------- -%% Resuming from DB state -%%-------------------------------------------------------------------- - -%% Must be called inside a emqx_cm_locker transaction. --spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session()) -> - {emqx_session:session(), [emqx_types:deliver()]}. -resume(ClientInfo, ConnInfo, Session) -> - SessionID = emqx_session:info(id, Session), - ?tp(ps_resuming, #{from => db, sid => SessionID}), - - %% NOTE: Order is important! - - %% 1. Get pending messages from DB. - ?tp(ps_initial_pendings, #{sid => SessionID}), - Pendings1 = pending(SessionID), - ?tp(ps_got_initial_pendings, #{ - sid => SessionID, - msgs => Pendings1 - }), - - %% 2. Enqueue messages to mimic that the process was alive - %% when the messages were delivered. - ?tp(ps_persist_pendings, #{sid => SessionID}), - Session1 = emqx_session:enqueue(ClientInfo, Pendings1, Session), - Session2 = persist(ClientInfo, ConnInfo, Session1), - mark_as_delivered(SessionID, Pendings1), - ?tp(ps_persist_pendings_msgs, #{ - msgs => Pendings1, - sid => SessionID - }), - - %% 3. Notify writers that we are resuming. - %% They will buffer new messages. - ?tp(ps_notify_writers, #{sid => SessionID}), - Nodes = mria:running_nodes(), - NodeMarkers = resume_begin(Nodes, SessionID), - ?tp(ps_node_markers, #{sid => SessionID, markers => NodeMarkers}), - - %% 4. Subscribe to topics. - ?tp(ps_resume_session, #{sid => SessionID}), - ok = emqx_session:resume(ClientInfo, Session2), - - %% 5. Get pending messages from DB until we find all markers. - ?tp(ps_marker_pendings, #{sid => SessionID}), - MarkerIDs = [Marker || {_, Marker} <- NodeMarkers], - Pendings2 = pending(SessionID, MarkerIDs), - ?tp(ps_marker_pendings_msgs, #{ - sid => SessionID, - msgs => Pendings2 - }), - - %% 6. Get pending messages from writers. - ?tp(ps_resume_end, #{sid => SessionID}), - WriterPendings = resume_end(Nodes, SessionID), - ?tp(ps_writer_pendings, #{ - msgs => WriterPendings, - sid => SessionID - }), - - %% 7. Drain the inbox and usort the messages - %% with the pending messages. (Should be done by caller.) - {Session2, Pendings2 ++ WriterPendings}. - -resume_begin(Nodes, SessionID) -> - Res = emqx_persistent_session_proto_v1:resume_begin(Nodes, self(), SessionID), - [{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)]. - -resume_end(Nodes, SessionID) -> - Res = emqx_persistent_session_proto_v1:resume_end(Nodes, self(), SessionID), - ?tp(ps_erpc_multical_result, #{res => Res, sid => SessionID}), - %% TODO: Should handle the errors - [ - {deliver, STopic, M} - || {ok, {ok, Messages}} <- Res, - {{M, STopic}} <- Messages - ]. - -%%-------------------------------------------------------------------- -%% Messages API -%%-------------------------------------------------------------------- - -persist_message(Msg) -> - case is_store_enabled() of - true -> do_persist_message(Msg); - false -> ok - end. - -do_persist_message(Msg) -> - case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of - true -> - ok; - false -> - case emqx_session_router:match_routes(emqx_message:topic(Msg)) of - [] -> - ok; - Routes -> - put_message(Msg), - MsgId = emqx_message:id(Msg), - persist_message_routes(Routes, MsgId, Msg) - end - end. - -persist_message_routes([#route{dest = SessionID, topic = STopic} | Left], MsgId, Msg) -> - ?tp(ps_persist_msg, #{sid => SessionID, payload => emqx_message:payload(Msg)}), - put_session_message({SessionID, MsgId, STopic, ?UNDELIVERED}), - emqx_session_router:buffer(SessionID, STopic, Msg), - persist_message_routes(Left, MsgId, Msg); -persist_message_routes([], _MsgId, _Msg) -> - ok. - -mark_as_delivered(SessionID, List) -> - case is_store_enabled() of - true -> do_mark_as_delivered(SessionID, List); - false -> ok - end. - -do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) -> - MsgID = emqx_message:id(Msg), - case next_session_message({SessionID, MsgID, STopic, ?ABANDONED}) of - {SessionID, MsgID, STopic, ?UNDELIVERED} = Key -> - %% We can safely delete this entry - %% instead of marking it as delivered. - delete_session_message(Key); - _ -> - put_session_message({SessionID, MsgID, STopic, ?DELIVERED}) - end, - do_mark_as_delivered(SessionID, Left); -do_mark_as_delivered(_SessionID, []) -> - ok. - --spec pending(emqx_session:session_id()) -> - [{emqx_types:message(), STopic :: binary()}]. -pending(SessionID) -> - pending_messages_in_db(SessionID, []). - --spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) -> - [{emqx_types:message(), STopic :: binary()}]. -pending(SessionID, MarkerIds) -> - %% TODO: Handle lost MarkerIDs - case emqx_session_router:pending(SessionID, MarkerIds) of - incomplete -> - timer:sleep(10), - pending(SessionID, MarkerIds); - Delivers -> - Delivers - end. - -%%-------------------------------------------------------------------- -%% Session internal functions -%%-------------------------------------------------------------------- - -%% @private [MQTT-3.1.2-23] -persistent_session_status(#session_store{expiry_interval = 0}) -> - not_persistent; -persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) -> - persistent; -persistent_session_status(#session_store{expiry_interval = E, ts = TS}) -> - case E + TS > erlang:system_time(millisecond) of - true -> persistent; - false -> expired - end. - -%%-------------------------------------------------------------------- -%% Pending messages internal functions -%%-------------------------------------------------------------------- - -pending_messages_fun(SessionID, MarkerIds) -> - fun() -> - case pending_messages({SessionID, <<>>, <<>>, ?DELIVERED}, [], MarkerIds) of - {Pending, []} -> read_pending_msgs(Pending, []); - {_Pending, [_ | _]} -> incomplete - end - end. - -read_pending_msgs([{MsgId, STopic} | Left], Acc) -> - Acc1 = - try - [{deliver, STopic, get_message(MsgId)} | Acc] - catch - error:{msg_not_found, _} -> - HighwaterMark = - erlang:system_time(microsecond) - - emqx_config:get(?msg_retain) * 1000, - case emqx_guid:timestamp(MsgId) < HighwaterMark of - %% Probably cleaned by GC - true -> Acc; - false -> error({msg_not_found, MsgId}) - end - end, - read_pending_msgs(Left, Acc1); -read_pending_msgs([], Acc) -> - lists:reverse(Acc). - -%% The keys are ordered by -%% {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). -%% {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} -%% where -%% <<>> < emqx_guid:guid() -%% <<>> < bin_timestamp() -%% emqx_guid:guid() is ordered in ts() and by node() -%% ?ABANDONED < ?UNDELIVERED < ?DELIVERED < ?MARKER -%% -%% We traverse the table until we reach another session. -%% TODO: Garbage collect the delivered messages. -pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, MarkerIds) -> - case next_session_message(PrevKey) of - {S, <<>>, _TS, ?ABANDONED} when S =:= SessionID -> - {[], []}; - {S, MsgId, <<>>, ?MARKER} = Key when S =:= SessionID -> - MarkerIds1 = MarkerIds -- [MsgId], - case PrevTag =:= ?UNDELIVERED of - false -> pending_messages(Key, Acc, MarkerIds1); - true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds1) - end; - {S, MsgId, STopic, ?DELIVERED} = Key when - S =:= SessionID, - MsgId =:= PrevMsgId, - STopic =:= PrevSTopic - -> - pending_messages(Key, Acc, MarkerIds); - {S, _MsgId, _STopic, _Tag} = Key when S =:= SessionID -> - case PrevTag =:= ?UNDELIVERED of - false -> pending_messages(Key, Acc, MarkerIds); - true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds) - end; - %% Next session_id or '$end_of_table' - _What -> - case PrevTag =:= ?UNDELIVERED of - false -> {lists:reverse(Acc), MarkerIds}; - true -> {lists:reverse([{PrevMsgId, PrevSTopic} | Acc]), MarkerIds} - end - end. - -%%-------------------------------------------------------------------- -%% Garbage collection -%%-------------------------------------------------------------------- - --spec gc_session_messages(gc_traverse_fun()) -> 'ok'. -gc_session_messages(Fun) -> - gc_traverse(first_session_message(), <<>>, false, Fun). - -gc_traverse('$end_of_table', _SessionID, _Abandoned, _Fun) -> - ok; -gc_traverse({S, <<>>, _TS, ?ABANDONED} = Key, _SessionID, _Abandoned, Fun) -> - %% Only report the abandoned session if it has no messages. - %% We want to keep the abandoned marker to last to make the GC reentrant. - case next_session_message(Key) of - '$end_of_table' = NextKey -> - ok = Fun(abandoned, Key), - gc_traverse(NextKey, S, true, Fun); - {S2, _, _, _} = NextKey when S =:= S2 -> - gc_traverse(NextKey, S, true, Fun); - {_, _, _, _} = NextKey -> - ok = Fun(abandoned, Key), - gc_traverse(NextKey, S, true, Fun) - end; -gc_traverse({S, _MsgID, <<>>, ?MARKER} = Key, SessionID, Abandoned, Fun) -> - ok = Fun(marker, Key), - NewAbandoned = S =:= SessionID andalso Abandoned, - gc_traverse(next_session_message(Key), S, NewAbandoned, Fun); -gc_traverse({S, _MsgID, _STopic, _Tag} = Key, SessionID, Abandoned, Fun) when - Abandoned andalso - S =:= SessionID --> - %% Delete all messages from an abandoned session. - ok = Fun(delete, Key), - gc_traverse(next_session_message(Key), S, Abandoned, Fun); -gc_traverse({S, MsgID, STopic, ?UNDELIVERED} = Key, SessionID, Abandoned, Fun) -> - case next_session_message(Key) of - {S1, M, ST, ?DELIVERED} = NextKey when - S1 =:= S andalso - MsgID =:= M andalso - STopic =:= ST - -> - %% We have both markers for the same message/topic so it is safe to delete both. - ok = Fun(delete, Key), - ok = Fun(delete, NextKey), - gc_traverse(next_session_message(NextKey), S, Abandoned, Fun); - NextKey -> - %% Something else is here, so let's just loop. - NewAbandoned = S =:= SessionID andalso Abandoned, - gc_traverse(NextKey, SessionID, NewAbandoned, Fun) - end; -gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) -> - %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing. - NewAbandoned = S =:= SessionID andalso Abandoned, - gc_traverse(next_session_message(Key), S, NewAbandoned, Fun). - --spec storage_type() -> ram | disc. -storage_type() -> - case emqx_config:get(?on_disc_key) of - true -> disc; - false -> ram - end. diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl deleted file mode 100644 index 5476d8daf..000000000 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl +++ /dev/null @@ -1,41 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). - --record(session_store, { - client_id :: binary(), - expiry_interval :: non_neg_integer(), - ts :: non_neg_integer(), - session :: emqx_session:session() -}). - --record(session_msg, { - key :: emqx_persistent_session:sess_msg_key(), - val = [] :: [] -}). - --define(cfg_root, persistent_session_store). --define(db_backend_key, [?cfg_root, db_backend]). --define(is_enabled_key, [?cfg_root, enabled]). --define(msg_retain, [?cfg_root, max_retain_undelivered]). --define(on_disc_key, [?cfg_root, on_disc]). - --define(SESSION_STORE, emqx_session_store). --define(SESS_MSG_TAB, emqx_session_msg). --define(MSG_TAB, emqx_persistent_msg). - --define(db_backend, (persistent_term:get(?db_backend_key))). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl deleted file mode 100644 index 34305d7bc..000000000 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl +++ /dev/null @@ -1,157 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_backend_builtin). - --include("emqx.hrl"). --include_lib("typerefl/include/types.hrl"). --include("emqx_persistent_session.hrl"). - --export([ - create_tables/0, - first_message_id/0, - next_message_id/1, - delete_message/1, - first_session_message/0, - next_session_message/1, - delete_session_message/1, - put_session_store/1, - delete_session_store/1, - lookup_session_store/1, - put_session_message/1, - put_message/1, - get_message/1, - ro_transaction/1 -]). - --type mria_table_type() :: ram_copies | disc_copies | rocksdb_copies. - --define(IS_ETS(BACKEND), (BACKEND =:= ram_copies orelse BACKEND =:= disc_copies)). - -create_tables() -> - SessStoreBackend = table_type(session), - ok = mria:create_table(?SESSION_STORE, [ - {type, set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, SessStoreBackend}, - {record_name, session_store}, - {attributes, record_info(fields, session_store)}, - {storage_properties, storage_properties(?SESSION_STORE, SessStoreBackend)} - ]), - - SessMsgBackend = table_type(session_messages), - ok = mria:create_table(?SESS_MSG_TAB, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, SessMsgBackend}, - {record_name, session_msg}, - {attributes, record_info(fields, session_msg)}, - {storage_properties, storage_properties(?SESS_MSG_TAB, SessMsgBackend)} - ]), - - MsgBackend = table_type(messages), - ok = mria:create_table(?MSG_TAB, [ - {type, ordered_set}, - {rlog_shard, ?PERSISTENT_SESSION_SHARD}, - {storage, MsgBackend}, - {record_name, message}, - {attributes, record_info(fields, message)}, - {storage_properties, storage_properties(?MSG_TAB, MsgBackend)} - ]). - -first_session_message() -> - mnesia:dirty_first(?SESS_MSG_TAB). - -next_session_message(Key) -> - mnesia:dirty_next(?SESS_MSG_TAB, Key). - -first_message_id() -> - mnesia:dirty_first(?MSG_TAB). - -next_message_id(Key) -> - mnesia:dirty_next(?MSG_TAB, Key). - -delete_message(Key) -> - mria:dirty_delete(?MSG_TAB, Key). - -delete_session_message(Key) -> - mria:dirty_delete(?SESS_MSG_TAB, Key). - -put_session_store(SS) -> - mria:dirty_write(?SESSION_STORE, SS). - -delete_session_store(ClientID) -> - mria:dirty_delete(?SESSION_STORE, ClientID). - -lookup_session_store(ClientID) -> - case mnesia:dirty_read(?SESSION_STORE, ClientID) of - [] -> none; - [SS] -> {value, SS} - end. - -put_session_message(SessMsg) -> - mria:dirty_write(?SESS_MSG_TAB, SessMsg). - -put_message(Msg) -> - mria:dirty_write(?MSG_TAB, Msg). - -get_message(MsgId) -> - case mnesia:read(?MSG_TAB, MsgId) of - [] -> error({msg_not_found, MsgId}); - [Msg] -> Msg - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), - Res. - --spec storage_properties(?SESSION_STORE | ?SESS_MSG_TAB | ?MSG_TAB, mria_table_type()) -> term(). -storage_properties(?SESSION_STORE, Backend) when ?IS_ETS(Backend) -> - [{ets, [{read_concurrency, true}]}]; -storage_properties(_, Backend) when ?IS_ETS(Backend) -> - [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ]; -storage_properties(_, _) -> - []. - -%% Dialyzer sees the compiled literal in -%% `mria:rocksdb_backend_available/0' and complains about the -%% complementar match arm... --dialyzer({no_match, table_type/1}). --spec table_type(atom()) -> mria_table_type(). -table_type(Table) -> - DiscPersistence = emqx_config:get([?cfg_root, on_disc]), - RamCache = get_overlayed(Table, ram_cache), - RocksDBAvailable = mria:rocksdb_backend_available(), - case {DiscPersistence, RamCache, RocksDBAvailable} of - {true, true, _} -> - disc_copies; - {true, false, true} -> - rocksdb_copies; - {true, false, false} -> - disc_copies; - {false, _, _} -> - ram_copies - end. - --spec get_overlayed(atom(), on_disc | ram_cache) -> boolean(). -get_overlayed(Table, Suffix) -> - Default = emqx_config:get([?cfg_root, Suffix]), - emqx_config:get([?cfg_root, backend, Table, Suffix], Default). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl deleted file mode 100644 index 1b8beef33..000000000 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_backend_dummy.erl +++ /dev/null @@ -1,76 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_backend_dummy). - --include("emqx_persistent_session.hrl"). - --export([ - first_message_id/0, - next_message_id/1, - delete_message/1, - first_session_message/0, - next_session_message/1, - delete_session_message/1, - put_session_store/1, - delete_session_store/1, - lookup_session_store/1, - put_session_message/1, - put_message/1, - get_message/1, - ro_transaction/1 -]). - -first_message_id() -> - '$end_of_table'. - -next_message_id(_) -> - '$end_of_table'. - --spec delete_message(binary()) -> no_return(). -delete_message(_Key) -> - error(should_not_be_called). - -first_session_message() -> - '$end_of_table'. - -next_session_message(_Key) -> - '$end_of_table'. - -delete_session_message(_Key) -> - ok. - -put_session_store(#session_store{}) -> - ok. - -delete_session_store(_ClientID) -> - ok. - -lookup_session_store(_ClientID) -> - none. - -put_session_message({_, _, _, _}) -> - ok. - -put_message(_Msg) -> - ok. - --spec get_message(binary()) -> no_return(). -get_message(_MsgId) -> - error(should_not_be_called). - -ro_transaction(Fun) -> - Fun(). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl deleted file mode 100644 index 4aa59cdef..000000000 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl +++ /dev/null @@ -1,163 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_persistent_session_gc). - --behaviour(gen_server). - --include("emqx_persistent_session.hrl"). - -%% API --export([start_link/0]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2 -]). - --ifdef(TEST). --export([ - session_gc_worker/2, - message_gc_worker/0 -]). --endif. - --define(SERVER, ?MODULE). -%% TODO: Maybe these should be configurable? --define(MARKER_GRACE_PERIOD, 60000000). --define(ABANDONED_GRACE_PERIOD, 300000000). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([]) -> - process_flag(trap_exit, true), - mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD), - {ok, start_message_gc_timer(start_session_gc_timer(#{}))}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info({timeout, Ref, session_gc_timeout}, State) -> - State1 = session_gc_timeout(Ref, State), - {noreply, State1}; -handle_info({timeout, Ref, message_gc_timeout}, State) -> - State1 = message_gc_timeout(Ref, State), - {noreply, State1}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% Session messages GC -%%-------------------------------------------------------------------- - -start_session_gc_timer(State) -> - Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]), - State#{session_gc_timer => erlang:start_timer(Interval, self(), session_gc_timeout)}. - -session_gc_timeout(Ref, #{session_gc_timer := R} = State) when R =:= Ref -> - %% Prevent overlapping processes. - GCPid = maps:get(session_gc_pid, State, undefined), - case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of - true -> - start_session_gc_timer(State); - false -> - start_session_gc_timer(State#{ - session_gc_pid => proc_lib:spawn_link(fun session_gc_worker/0) - }) - end; -session_gc_timeout(_Ref, State) -> - State. - -session_gc_worker() -> - ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2). - -session_gc_worker(delete, Key) -> - emqx_persistent_session:delete_session_message(Key); -session_gc_worker(marker, Key) -> - TS = emqx_persistent_session:session_message_info(timestamp, Key), - case TS + ?MARKER_GRACE_PERIOD < erlang:system_time(microsecond) of - true -> emqx_persistent_session:delete_session_message(Key); - false -> ok - end; -session_gc_worker(abandoned, Key) -> - TS = emqx_persistent_session:session_message_info(timestamp, Key), - case TS + ?ABANDONED_GRACE_PERIOD < erlang:system_time(microsecond) of - true -> emqx_persistent_session:delete_session_message(Key); - false -> ok - end. - -%%-------------------------------------------------------------------- -%% Message GC -%% -------------------------------------------------------------------- -%% The message GC simply removes all messages older than the retain -%% period. A more exact GC would either involve treating the session -%% message table as root set, or some kind of reference counting. -%% We sacrifice space for simplicity at this point. -start_message_gc_timer(State) -> - Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]), - State#{message_gc_timer => erlang:start_timer(Interval, self(), message_gc_timeout)}. - -message_gc_timeout(Ref, #{message_gc_timer := R} = State) when R =:= Ref -> - %% Prevent overlapping processes. - GCPid = maps:get(message_gc_pid, State, undefined), - case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of - true -> - start_message_gc_timer(State); - false -> - start_message_gc_timer(State#{ - message_gc_pid => proc_lib:spawn_link(fun message_gc_worker/0) - }) - end; -message_gc_timeout(_Ref, State) -> - State. - -message_gc_worker() -> - HighWaterMark = erlang:system_time(microsecond) - emqx_config:get(?msg_retain) * 1000, - message_gc_worker(emqx_persistent_session:first_message_id(), HighWaterMark). - -message_gc_worker('$end_of_table', _HighWaterMark) -> - ok; -message_gc_worker(MsgId, HighWaterMark) -> - case emqx_guid:timestamp(MsgId) < HighWaterMark of - true -> - emqx_persistent_session:delete_message(MsgId), - message_gc_worker(emqx_persistent_session:next_message_id(MsgId), HighWaterMark); - false -> - ok - end. diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl deleted file mode 100644 index 3018df96a..000000000 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl +++ /dev/null @@ -1,69 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - %% We want this supervisor to own the table for restarts - SessionTab = emqx_session_router:create_init_tab(), - - %% Resume worker sup - ResumeSup = #{ - id => router_worker_sup, - start => {emqx_session_router_worker_sup, start_link, [SessionTab]}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => [emqx_session_router_worker_sup] - }, - - SessionRouterPool = emqx_pool_sup:spec( - session_router_pool, - [ - session_router_pool, - hash, - {emqx_session_router, start_link, []} - ] - ), - - GCWorker = child_spec(emqx_persistent_session_gc, worker), - - Spec = #{ - strategy => one_for_all, - intensity => 0, - period => 1 - }, - - {ok, {Spec, [ResumeSup, SessionRouterPool, GCWorker]}}. - -child_spec(Mod, worker) -> - #{ - id => Mod, - start => {Mod, start_link, []}, - restart => permanent, - shutdown => 15000, - type => worker, - modules => [Mod] - }. diff --git a/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl deleted file mode 100644 index 875f19852..000000000 --- a/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl +++ /dev/null @@ -1,41 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_persistent_session_proto_v1). - --behaviour(emqx_bpapi). - --export([ - introduced_in/0, - resume_begin/3, - resume_end/3 -]). - --include("bpapi.hrl"). --include("emqx.hrl"). - -introduced_in() -> - "5.0.0". - --spec resume_begin([node()], pid(), binary()) -> - emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]). -resume_begin(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) -> - erpc:multicall(Nodes, emqx_session_router, resume_begin, [Pid, SessionID]). - --spec resume_end([node()], pid(), binary()) -> - emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}). -resume_end(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) -> - erpc:multicall(Nodes, emqx_session_router, resume_end, [Pid, SessionID]). diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 56baf05e8..b44e564c7 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -53,11 +53,13 @@ -define(IGNORED_MODULES, "emqx_rpc"). -define(FORCE_DELETED_MODULES, [ emqx_statsd, - emqx_statsd_proto_v1 + emqx_statsd_proto_v1, + emqx_persistent_session_proto_v1 ]). -define(FORCE_DELETED_APIS, [ {emqx_statsd, 1}, - {emqx_plugin_libs, 1} + {emqx_plugin_libs, 1}, + {emqx_persistent_session, 1} ]). %% List of known RPC backend modules: -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 07cfabc70..8776d7361 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -20,7 +20,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("../include/emqx.hrl"). --include("../src/persistent_session/emqx_persistent_session.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -51,76 +50,23 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)], - GCTests = [TC || TC <- TCs, is_gc_tc(TC)], - OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests, [ - {persistent_store_enabled, [ - {group, ram_tables}, - {group, disc_tables} - ]}, {persistent_store_disabled, [{group, no_kill_connection_process}]}, - {ram_tables, [], [ - {group, no_kill_connection_process}, - {group, kill_connection_process}, - {group, snabbkaffe}, - {group, gc_tests} - ]}, - {disc_tables, [], [ - {group, no_kill_connection_process}, - {group, kill_connection_process}, - {group, snabbkaffe}, - {group, gc_tests} - ]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, - {kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, - {snabbkaffe, [], [ - {group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe} - ]}, - {tcp, [], OtherTCs}, - {quic, [], OtherTCs}, - {ws, [], OtherTCs}, - {tcp_snabbkaffe, [], SnabbkaffeTCs}, - {quic_snabbkaffe, [], SnabbkaffeTCs}, - {ws_snabbkaffe, [], SnabbkaffeTCs}, - {gc_tests, [], GCTests} + {tcp, [], TCs}, + {quic, [], TCs}, + {ws, [], TCs} ]. -is_snabbkaffe_tc(TC) -> - re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch. - -is_gc_tc(TC) -> - re:run(atom_to_list(TC), "^t_gc_") /= nomatch. - -init_per_group(persistent_store_enabled, Config) -> - [{persistent_store_enabled, true} | Config]; -init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables -> - %% Start Apps - Reply = - case Group =:= ram_tables of - true -> ram; - false -> disc - end, - emqx_common_test_helpers:boot_modules(all), - meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_config, get, fun - (?on_disc_key) -> Reply =:= disc; - (?is_enabled_key) -> true; - (Other) -> meck:passthrough([Other]) - end), - emqx_common_test_helpers:start_apps([], fun set_special_confs/1), - ?assertEqual(true, emqx_persistent_session:is_store_enabled()), - Config; init_per_group(persistent_store_disabled, Config) -> %% Start Apps emqx_common_test_helpers:boot_modules(all), meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_config, get, fun - (?is_enabled_key) -> false; + ([persistent_session_store, enabled]) -> false; (Other) -> meck:passthrough([Other]) end), emqx_common_test_helpers:start_apps([], fun set_special_confs/1), - ?assertEqual(false, emqx_persistent_session:is_store_enabled()), [{persistent_store_enabled, false} | Config]; init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe -> [ @@ -140,43 +86,7 @@ init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe -> init_per_group(no_kill_connection_process, Config) -> [{kill_connection_process, false} | Config]; init_per_group(kill_connection_process, Config) -> - [{kill_connection_process, true} | Config]; -init_per_group(snabbkaffe, Config) -> - [{kill_connection_process, true} | Config]; -init_per_group(gc_tests, Config) -> - %% We need to make sure the system does not interfere with this test group. - lists:foreach( - fun(ClientId) -> - maybe_kill_connection_process(ClientId, [{kill_connection_process, true}]) - end, - emqx_cm:all_client_ids() - ), - emqx_common_test_helpers:stop_apps([]), - SessionMsgEts = gc_tests_session_store, - MsgEts = gc_tests_msg_store, - Pid = spawn(fun() -> - ets:new(SessionMsgEts, [named_table, public, ordered_set]), - ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]), - receive - stop -> ok - end - end), - meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), - meck:expect(mnesia, dirty_first, fun - (?SESS_MSG_TAB) -> ets:first(SessionMsgEts); - (?MSG_TAB) -> ets:first(MsgEts); - (X) -> meck:passthrough([X]) - end), - meck:expect(mnesia, dirty_next, fun - (?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X); - (?MSG_TAB, X) -> ets:next(MsgEts, X); - (Tab, X) -> meck:passthrough([Tab, X]) - end), - meck:expect(mnesia, dirty_delete, fun - (?MSG_TAB, X) -> ets:delete(MsgEts, X); - (Tab, X) -> meck:passthrough([Tab, X]) - end), - [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config]. + [{kill_connection_process, true} | Config]. init_per_suite(Config) -> Config. @@ -188,15 +98,6 @@ end_per_suite(_Config) -> emqx_common_test_helpers:ensure_mnesia_stopped(), ok. -end_per_group(gc_tests, Config) -> - meck:unload(mnesia), - ?config(store_owner, Config) ! stop, - ok; -end_per_group(Group, _Config) when - Group =:= ram_tables; Group =:= disc_tables --> - meck:unload(emqx_config), - emqx_common_test_helpers:stop_apps([]); end_per_group(persistent_store_disabled, _Config) -> meck:unload(emqx_config), emqx_common_test_helpers:stop_apps([]); @@ -205,23 +106,12 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config) -> Config1 = preconfig_per_testcase(TestCase, Config), - case is_gc_tc(TestCase) of - true -> - ets:delete_all_objects(?config(msg_store, Config)), - ets:delete_all_objects(?config(session_msg_store, Config)); - false -> - skip - end, case erlang:function_exported(?MODULE, TestCase, 2) of true -> ?MODULE:TestCase(init, Config1); _ -> Config1 end. end_per_testcase(TestCase, Config) -> - case is_snabbkaffe_tc(TestCase) of - true -> snabbkaffe:stop(); - false -> skip - end, case erlang:function_exported(?MODULE, TestCase, 2) of true -> ?MODULE:TestCase('end', Config); false -> ok @@ -307,20 +197,6 @@ wait_for_cm_unregister(ClientId, N) -> wait_for_cm_unregister(ClientId, N - 1) end. -snabbkaffe_sync_publish(Topic, Payloads) -> - Fun = fun(Client, Payload) -> - ?check_trace( - begin - ?wait_async_action( - {ok, _} = emqtt:publish(Client, Topic, Payload, 2), - #{?snk_kind := ps_persist_msg, payload := Payload} - ) - end, - fun(_, _Trace) -> ok end - ) - end, - do_publish(Payloads, Fun, true). - publish(Topic, Payloads) -> publish(Topic, Payloads, false). @@ -514,20 +390,6 @@ t_persist_on_disconnect(Config) -> ?assertEqual(0, client_info(session_present, Client2)), ok = emqtt:disconnect(Client2). -wait_for_pending(SId) -> - wait_for_pending(SId, 100). - -wait_for_pending(_SId, 0) -> - error(exhausted_wait_for_pending); -wait_for_pending(SId, N) -> - case emqx_persistent_session:pending(SId) of - [] -> - timer:sleep(1), - wait_for_pending(SId, N - 1); - [_ | _] = Pending -> - Pending - end. - t_process_dies_session_expires(Config) -> %% Emulate an error in the connect process, %% or that the node of the process goes down. @@ -552,36 +414,8 @@ t_process_dies_session_expires(Config) -> ok = publish(Topic, [Payload]), - SessionId = - case ?config(persistent_store_enabled, Config) of - false -> - undefined; - true -> - %% The session should not be marked as expired. - {Tag, Session} = emqx_persistent_session:lookup(ClientId), - ?assertEqual(persistent, Tag), - SId = emqx_session:info(id, Session), - case ?config(kill_connection_process, Config) of - true -> - %% The session should have a pending message - ?assertMatch([_], wait_for_pending(SId)); - false -> - skip - end, - SId - end, - timer:sleep(1100), - %% The session should now be marked as expired. - case - (?config(kill_connection_process, Config) andalso - ?config(persistent_store_enabled, Config)) - of - true -> ?assertMatch({expired, _}, emqx_persistent_session:lookup(ClientId)); - false -> skip - end, - {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, {clientid, ClientId}, @@ -592,21 +426,6 @@ t_process_dies_session_expires(Config) -> {ok, _} = emqtt:ConnFun(Client2), ?assertEqual(0, client_info(session_present, Client2)), - case - (?config(kill_connection_process, Config) andalso - ?config(persistent_store_enabled, Config)) - of - true -> - %% The session should be a fresh one - {persistent, NewSession} = emqx_persistent_session:lookup(ClientId), - ?assertNotEqual(SessionId, emqx_session:info(id, NewSession)), - %% The old session should now either - %% be marked as abandoned or already be garbage collected. - ?assertMatch([], emqx_persistent_session:pending(SessionId)); - false -> - skip - end, - %% We should not receive the pending message ?assertEqual([], receive_messages(1)), @@ -724,7 +543,6 @@ t_clean_start_drops_subscriptions(Config) -> t_unsubscribe(Config) -> ConnFun = ?config(conn_fun, Config), - Topic = ?config(topic, Config), STopic = ?config(stopic, Config), ClientId = ?config(client_id, Config), {ok, Client} = emqtt:start_link([ @@ -735,22 +553,9 @@ t_unsubscribe(Config) -> ]), {ok, _} = emqtt:ConnFun(Client), {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2), - case emqx_persistent_session:is_store_enabled() of - true -> - {persistent, Session} = emqx_persistent_session:lookup(ClientId), - SessionID = emqx_session:info(id, Session), - SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)], - ?assert(lists:member(SessionID, SessionIDs)), - ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), - {ok, _, _} = emqtt:unsubscribe(Client, STopic), - ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), - SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)], - ?assert(not lists:member(SessionID, SessionIDs2)); - false -> - ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), - {ok, _, _} = emqtt:unsubscribe(Client, STopic), - ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]) - end, + ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), + {ok, _, _} = emqtt:unsubscribe(Client, STopic), + ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), ok = emqtt:disconnect(Client). t_multiple_subscription_matches(Config) -> @@ -794,515 +599,3 @@ t_multiple_subscription_matches(Config) -> ?assertEqual({ok, 2}, maps:find(qos, Msg1)), ?assertEqual({ok, 2}, maps:find(qos, Msg2)), ok = emqtt:disconnect(Client2). - -t_lost_messages_because_of_gc(init, Config) -> - case - (emqx_persistent_session:is_store_enabled() andalso - ?config(kill_connection_process, Config)) - of - true -> - Retain = 1000, - OldRetain = emqx_config:get(?msg_retain, Retain), - emqx_config:put(?msg_retain, Retain), - [{retain, Retain}, {old_retain, OldRetain} | Config]; - false -> - {skip, only_relevant_with_store_and_kill_process} - end; -t_lost_messages_because_of_gc('end', Config) -> - OldRetain = ?config(old_retain, Config), - emqx_config:put(?msg_retain, OldRetain), - ok. - -t_lost_messages_because_of_gc(Config) -> - ConnFun = ?config(conn_fun, Config), - Topic = ?config(topic, Config), - STopic = ?config(stopic, Config), - ClientId = ?config(client_id, Config), - Retain = ?config(retain, Config), - Payload1 = <<"hello1">>, - Payload2 = <<"hello2">>, - {ok, Client1} = emqtt:start_link([ - {clientid, ClientId}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 30}} - | Config - ]), - {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), - emqtt:disconnect(Client1), - maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload1), - timer:sleep(2 * Retain), - publish(Topic, Payload2), - emqx_persistent_session_gc:message_gc_worker(), - {ok, Client2} = emqtt:start_link([ - {clientid, ClientId}, - {clean_start, false}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 30}} - | Config - ]), - {ok, _} = emqtt:ConnFun(Client2), - Msgs = receive_messages(2), - ?assertMatch([_], Msgs), - ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))), - emqtt:disconnect(Client2), - ok. - -%%-------------------------------------------------------------------- -%% Snabbkaffe helpers -%%-------------------------------------------------------------------- - -check_snabbkaffe_vanilla(Trace) -> - ResumeTrace = [ - T - || #{?snk_kind := K} = T <- Trace, - re:run(to_list(K), "^ps_") /= nomatch - ], - ?assertMatch([_ | _], ResumeTrace), - [_Sid] = lists:usort(?projection(sid, ResumeTrace)), - %% Check internal flow of the emqx_cm resuming - ?assert( - ?strict_causality( - #{?snk_kind := ps_resuming}, - #{?snk_kind := ps_initial_pendings}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_initial_pendings}, - #{?snk_kind := ps_persist_pendings}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_persist_pendings}, - #{?snk_kind := ps_notify_writers}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_notify_writers}, - #{?snk_kind := ps_node_markers}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_node_markers}, - #{?snk_kind := ps_resume_session}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_resume_session}, - #{?snk_kind := ps_marker_pendings}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_marker_pendings}, - #{?snk_kind := ps_marker_pendings_msgs}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_marker_pendings_msgs}, - #{?snk_kind := ps_resume_end}, - ResumeTrace - ) - ), - - %% Check flow between worker and emqx_cm - ?assert( - ?strict_causality( - #{?snk_kind := ps_notify_writers}, - #{?snk_kind := ps_worker_started}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_marker_pendings}, - #{?snk_kind := ps_worker_resume_end}, - ResumeTrace - ) - ), - ?assert( - ?strict_causality( - #{?snk_kind := ps_worker_resume_end}, - #{?snk_kind := ps_worker_shutdown}, - ResumeTrace - ) - ), - - [Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)), - ?assertMatch([_], Markers). - -to_list(L) when is_list(L) -> L; -to_list(A) when is_atom(A) -> atom_to_list(A); -to_list(B) when is_binary(B) -> binary_to_list(B). - -%%-------------------------------------------------------------------- -%% Snabbkaffe tests -%%-------------------------------------------------------------------- - -t_snabbkaffe_vanilla_stages(Config) -> - %% Test that all stages of session resume works ok in the simplest case - ConnFun = ?config(conn_fun, Config), - ClientId = ?config(client_id, Config), - EmqttOpts = [ - {proto_ver, v5}, - {clientid, ClientId}, - {properties, #{'Session-Expiry-Interval' => 30}} - | Config - ], - {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]), - {ok, _} = emqtt:ConnFun(Client1), - ok = emqtt:disconnect(Client1), - maybe_kill_connection_process(ClientId, Config), - - ?check_trace( - begin - {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), - {ok, _} = emqtt:ConnFun(Client2), - ok = emqtt:disconnect(Client2) - end, - fun(ok, Trace) -> - check_snabbkaffe_vanilla(Trace) - end - ), - ok. - -t_snabbkaffe_pending_messages(Config) -> - %% Make sure there are pending messages are fetched during the init stage. - ConnFun = ?config(conn_fun, Config), - ClientId = ?config(client_id, Config), - Topic = ?config(topic, Config), - STopic = ?config(stopic, Config), - Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3, 4, 5]], - EmqttOpts = [ - {proto_ver, v5}, - {clientid, ClientId}, - {properties, #{'Session-Expiry-Interval' => 30}} - | Config - ], - {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]), - {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), - ok = emqtt:disconnect(Client1), - maybe_kill_connection_process(ClientId, Config), - - ?check_trace( - begin - snabbkaffe_sync_publish(Topic, Payloads), - {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), - {ok, _} = emqtt:ConnFun(Client2), - Msgs = receive_messages(length(Payloads)), - ReceivedPayloads = [P || #{payload := P} <- Msgs], - ?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)), - ok = emqtt:disconnect(Client2) - end, - fun(ok, Trace) -> - check_snabbkaffe_vanilla(Trace), - %% Check that all messages was delivered from the DB - [Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)), - [Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)), - Delivers = Delivers1 ++ Delivers2, - ?assertEqual(length(Payloads), length(Delivers)), - %% Check for no duplicates - ?assertEqual(lists:usort(Delivers), lists:sort(Delivers)) - end - ), - ok. - -t_snabbkaffe_buffered_messages(Config) -> - %% Make sure to buffer messages during startup. - ConnFun = ?config(conn_fun, Config), - ClientId = ?config(client_id, Config), - Topic = ?config(topic, Config), - STopic = ?config(stopic, Config), - Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]], - Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]], - EmqttOpts = [ - {proto_ver, v5}, - {clientid, ClientId}, - {properties, #{'Session-Expiry-Interval' => 30}} - | Config - ], - {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]), - {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), - ok = emqtt:disconnect(Client1), - maybe_kill_connection_process(ClientId, Config), - - publish(Topic, Payloads1), - - ?check_trace( - begin - %% Make the resume init phase wait until the first message is delivered. - ?force_ordering( - #{?snk_kind := ps_worker_deliver}, - #{?snk_kind := ps_resume_end} - ), - Parent = self(), - spawn_link(fun() -> - ?block_until(#{?snk_kind := ps_marker_pendings_msgs}, infinity, 5000), - publish(Topic, Payloads2, true), - Parent ! publish_done, - ok - end), - {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), - {ok, _} = emqtt:ConnFun(Client2), - receive - publish_done -> ok - after 10000 -> error(too_long_to_publish) - end, - Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1), - ReceivedPayloads = [P || #{payload := P} <- Msgs], - ?assertEqual( - lists:sort(Payloads1 ++ Payloads2), - lists:sort(ReceivedPayloads) - ), - ok = emqtt:disconnect(Client2) - end, - fun(ok, Trace) -> - check_snabbkaffe_vanilla(Trace), - %% Check that some messages was buffered in the writer process - [Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)), - ?assertMatch( - X when 0 < X andalso X =< length(Payloads2), - length(Msgs) - ) - end - ), - ok. - -%%-------------------------------------------------------------------- -%% GC tests -%%-------------------------------------------------------------------- - --define(MARKER, 3). --define(DELIVERED, 2). --define(UNDELIVERED, 1). --define(ABANDONED, 0). - -msg_id() -> - emqx_guid:gen(). - -delivered_msg(MsgId, SessionID, STopic) -> - {SessionID, MsgId, STopic, ?DELIVERED}. - -undelivered_msg(MsgId, SessionID, STopic) -> - {SessionID, MsgId, STopic, ?UNDELIVERED}. - -marker_msg(MarkerID, SessionID) -> - {SessionID, MarkerID, <<>>, ?MARKER}. - -guid(MicrosecondsAgo) -> - %% Make a fake GUID and set a timestamp. - <> = emqx_guid:gen(), - <<(TS - MicrosecondsAgo):64, Tail:64>>. - -abandoned_session_msg(SessionID) -> - abandoned_session_msg(SessionID, 0). - -abandoned_session_msg(SessionID, MicrosecondsAgo) -> - TS = erlang:system_time(microsecond), - {SessionID, <<>>, <<(TS - MicrosecondsAgo):64>>, ?ABANDONED}. - -fresh_gc_delete_fun() -> - Ets = ets:new(gc_collect, [ordered_set]), - fun - (delete, Key) -> - ets:insert(Ets, {Key}), - ok; - (collect, <<>>) -> - List = ets:match(Ets, {'$1'}), - ets:delete(Ets), - lists:append(List); - (_, _Key) -> - ok - end. - -fresh_gc_callbacks_fun() -> - Ets = ets:new(gc_collect, [ordered_set]), - fun - (collect, <<>>) -> - List = ets:match(Ets, {'$1'}), - ets:delete(Ets), - lists:append(List); - (Tag, Key) -> - ets:insert(Ets, {{Key, Tag}}), - ok - end. - -get_gc_delete_messages() -> - Fun = fresh_gc_delete_fun(), - emqx_persistent_session:gc_session_messages(Fun), - Fun(collect, <<>>). - -get_gc_callbacks() -> - Fun = fresh_gc_callbacks_fun(), - emqx_persistent_session:gc_session_messages(Fun), - Fun(collect, <<>>). - -t_gc_all_delivered(Config) -> - Store = ?config(session_msg_store, Config), - STopic = ?config(stopic, Config), - SessionId = emqx_guid:gen(), - MsgIds = [msg_id() || _ <- lists:seq(1, 5)], - Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], - Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], - SortedContent = lists:usort(Delivered ++ Undelivered), - ets:insert(Store, [{X, <<>>} || X <- SortedContent]), - GCMessages = get_gc_delete_messages(), - ?assertEqual(SortedContent, GCMessages), - ok. - -t_gc_some_undelivered(Config) -> - Store = ?config(session_msg_store, Config), - STopic = ?config(stopic, Config), - SessionId = emqx_guid:gen(), - MsgIds = [msg_id() || _ <- lists:seq(1, 10)], - Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], - {Delivered1, _Delivered2} = split(Delivered), - Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], - {Undelivered1, Undelivered2} = split(Undelivered), - Content = Delivered1 ++ Undelivered1 ++ Undelivered2, - ets:insert(Store, [{X, <<>>} || X <- Content]), - Expected = lists:usort(Delivered1 ++ Undelivered1), - GCMessages = get_gc_delete_messages(), - ?assertEqual(Expected, GCMessages), - ok. - -t_gc_with_markers(Config) -> - Store = ?config(session_msg_store, Config), - STopic = ?config(stopic, Config), - SessionId = emqx_guid:gen(), - MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)], - MarkerId = msg_id(), - MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1, - Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], - {Delivered1, _Delivered2} = split(Delivered), - Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], - {Undelivered1, Undelivered2} = split(Undelivered), - Markers = [marker_msg(MarkerId, SessionId)], - Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers, - ets:insert(Store, [{X, <<>>} || X <- Content]), - Expected = lists:usort(Delivered1 ++ Undelivered1), - GCMessages = get_gc_delete_messages(), - ?assertEqual(Expected, GCMessages), - ok. - -t_gc_abandoned_some_undelivered(Config) -> - Store = ?config(session_msg_store, Config), - STopic = ?config(stopic, Config), - SessionId = emqx_guid:gen(), - MsgIds = [msg_id() || _ <- lists:seq(1, 10)], - Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], - {Delivered1, _Delivered2} = split(Delivered), - Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], - {Undelivered1, Undelivered2} = split(Undelivered), - Abandoned = abandoned_session_msg(SessionId), - Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned], - ets:insert(Store, [{X, <<>>} || X <- Content]), - Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2), - GCMessages = get_gc_delete_messages(), - ?assertEqual(Expected, GCMessages), - ok. - -t_gc_abandoned_only_called_on_empty_session(Config) -> - Store = ?config(session_msg_store, Config), - STopic = ?config(stopic, Config), - SessionId = emqx_guid:gen(), - MsgIds = [msg_id() || _ <- lists:seq(1, 10)], - Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], - Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], - Abandoned = abandoned_session_msg(SessionId), - Content = Delivered ++ Undelivered ++ [Abandoned], - ets:insert(Store, [{X, <<>>} || X <- Content]), - GCMessages = get_gc_callbacks(), - - %% Since we had messages to delete, we don't expect to get the - %% callback on the abandoned session - ?assertEqual([], [X || {X, abandoned} <- GCMessages]), - - %% But if we have only the abandoned session marker for this - %% session, it should be called. - ets:delete_all_objects(Store), - UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>), - ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]), - GCMessages2 = get_gc_callbacks(), - ?assertEqual([Abandoned], [X || {X, abandoned} <- GCMessages2]), - ok. - -t_gc_session_gc_worker(init, Config) -> - meck:new(emqx_persistent_session, [passthrough, no_link]), - Config; -t_gc_session_gc_worker('end', _Config) -> - meck:unload(emqx_persistent_session), - ok. - -t_gc_session_gc_worker(Config) -> - STopic = ?config(stopic, Config), - SessionID = emqx_guid:gen(), - MsgDeleted = delivered_msg(msg_id(), SessionID, STopic), - MarkerNotDeleted = marker_msg(msg_id(), SessionID), - MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID), - AbandonedNotDeleted = abandoned_session_msg(SessionID), - AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000), - meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end), - emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted), - emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted), - emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted), - emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted), - emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted), - History = meck:history(emqx_persistent_session, self()), - DeleteCalls = [ - Key - || {_Pid, {_, delete_session_message, [Key]}, _Result} <- - History - ], - ?assertEqual( - lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]), - lists:sort(DeleteCalls) - ), - ok. - -t_gc_message_gc(Config) -> - Topic = ?config(topic, Config), - ClientID = ?config(client_id, Config), - Store = ?config(msg_store, Config), - NewMsgs = [ - emqx_message:make(ClientID, Topic, integer_to_binary(P)) - || P <- lists:seq(6, 10) - ], - Retain = 60 * 1000, - emqx_config:put(?msg_retain, Retain), - Msgs1 = [ - emqx_message:make(ClientID, Topic, integer_to_binary(P)) - || P <- lists:seq(1, 5) - ], - OldMsgs = [M#message{id = guid(Retain * 1000)} || M <- Msgs1], - ets:insert(Store, NewMsgs ++ OldMsgs), - ?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)), - ok = emqx_persistent_session_gc:message_gc_worker(), - ?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)), - ok. - -split(List) -> - split(List, [], []). - -split([], L1, L2) -> - {L1, L2}; -split([H], L1, L2) -> - {[H | L1], L2}; -split([H1, H2 | Left], L1, L2) -> - split(Left, [H1 | L1], [H2 | L2]). diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index 1d77fe170..b4d7ceb08 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -29,16 +29,6 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]). -init_per_testcase(t_persistence, _Config) -> - {skip, "Existing session persistence implementation is being phased out"}; -init_per_testcase(_TestCase, Config) -> - Config. - -end_per_testcase(t_persistence, Config) -> - Config; -end_per_testcase(_TestCase, _Config) -> - ok. - %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- @@ -199,40 +189,6 @@ t_get_connected_client_count(_Config) -> emqx_cm:get_connected_client_count() ). -t_persistence(_Config) -> - erlang:process_flag(trap_exit, true), - - Topic = <<"t1">>, - Message = <<"message_to_persist">>, - - {ok, C0} = emqtt_connect(?CLIENT_ID, false), - {ok, _, _} = emqtt:subscribe(C0, Topic, 0), - - Opts = evict_session_opts(?CLIENT_ID), - {ok, Pid} = emqx_eviction_agent_channel:start_supervised(Opts), - - {ok, C1} = emqtt_connect(), - {ok, _} = emqtt:publish(C1, Topic, Message, 1), - ok = emqtt:disconnect(C1), - - %% Kill channel so that the session is only persisted - ok = emqx_eviction_agent_channel:call(Pid, kick), - - %% Should restore session from persistents storage and receive messages - {ok, C2} = emqtt_connect(?CLIENT_ID, false), - - receive - {publish, #{ - payload := Message, - topic := Topic - }} -> - ok - after 1000 -> - ct:fail("message not received") - end, - - ok = emqtt:disconnect(C2). - %%-------------------------------------------------------------------- %% Helpers %%--------------------------------------------------------------------