From 43813e29c2480c87f2c35e9ecfa3c28bea9b6614 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 6 Dec 2023 10:52:42 +0100 Subject: [PATCH] chore(emqx): drop legacy session persistence impl leftovers --- apps/emqx/src/emqx_router_utils.erl | 30 +--- apps/emqx/src/emqx_session_router_worker.erl | 163 ------------------ .../src/emqx_session_router_worker_sup.erl | 64 ------- 3 files changed, 2 insertions(+), 255 deletions(-) delete mode 100644 apps/emqx/src/emqx_session_router_worker.erl delete mode 100644 apps/emqx/src/emqx_session_router_worker_sup.erl diff --git a/apps/emqx/src/emqx_router_utils.erl b/apps/emqx/src/emqx_router_utils.erl index 9fab86d37..146d9db52 100644 --- a/apps/emqx/src/emqx_router_utils.erl +++ b/apps/emqx/src/emqx_router_utils.erl @@ -19,18 +19,11 @@ -include("emqx.hrl"). -export([ - delete_direct_route/2, delete_trie_route/2, - delete_session_trie_route/2, - insert_direct_route/2, insert_trie_route/2, - insert_session_trie_route/2, maybe_trans/3 ]). -insert_direct_route(Tab, Route) -> - mria:dirty_write(Tab, Route). - insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> case mnesia:wread({RouteTab, Topic}) of [] -> emqx_trie:insert(Topic); @@ -38,31 +31,12 @@ insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> end, mnesia:write(RouteTab, Route, sticky_write). -insert_session_trie_route(RouteTab, Route = #route{topic = Topic}) -> - case mnesia:wread({RouteTab, Topic}) of - [] -> emqx_trie:insert_session(Topic); - _ -> ok - end, - mnesia:write(RouteTab, Route, sticky_write). - -delete_direct_route(RouteTab, Route) -> - mria:dirty_delete_object(RouteTab, Route). - -delete_trie_route(RouteTab, Route) -> - delete_trie_route(RouteTab, Route, normal). - -delete_session_trie_route(RouteTab, Route) -> - delete_trie_route(RouteTab, Route, session). - -delete_trie_route(RouteTab, Route = #route{topic = Topic}, Type) -> +delete_trie_route(RouteTab, Route = #route{topic = Topic}) -> case mnesia:wread({RouteTab, Topic}) of [R] when R =:= Route -> %% Remove route and trie ok = mnesia:delete_object(RouteTab, Route, sticky_write), - case Type of - normal -> emqx_trie:delete(Topic); - session -> emqx_trie:delete_session(Topic) - end; + emqx_trie:delete(Topic); [_ | _] -> %% Remove route only mnesia:delete_object(RouteTab, Route, sticky_write); diff --git a/apps/emqx/src/emqx_session_router_worker.erl b/apps/emqx/src/emqx_session_router_worker.erl deleted file mode 100644 index b55305d3e..000000000 --- a/apps/emqx/src/emqx_session_router_worker.erl +++ /dev/null @@ -1,163 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc The session router worker is responsible for buffering -%% messages for a persistent session while it is initializing. If a -%% connection process exists for a persistent session, this process is -%% used for bridging the gap while the new connection process takes -%% over the persistent session, but if there is no such process this -%% worker takes it place. -%% -%% The workers are started on all nodes, and buffers all messages that -%% are persisted to the session message table. In the final stage of -%% the initialization, the messages are delivered and the worker is -%% terminated. - --module(emqx_session_router_worker). - --behaviour(gen_server). - -%% API --export([ - buffer/3, - pendings/1, - resume_end/3, - start_link/2 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2 -]). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - --record(state, { - remote_pid :: pid(), - session_id :: binary(), - session_tab :: ets:table(), - messages :: ets:table(), - buffering :: boolean() -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -start_link(SessionTab, #{} = Opts) -> - gen_server:start_link(?MODULE, Opts#{session_tab => SessionTab}, []). - -pendings(Pid) -> - gen_server:call(Pid, pendings). - -resume_end(RemotePid, Pid, _SessionID) -> - case gen_server:call(Pid, {resume_end, RemotePid}) of - {ok, EtsHandle} -> - ?tp(ps_worker_call_ok, #{ - pid => Pid, - remote_pid => RemotePid, - sid => _SessionID - }), - {ok, ets:tab2list(EtsHandle)}; - {error, _} = Err -> - ?tp(ps_worker_call_failed, #{ - pid => Pid, - remote_pid => RemotePid, - sid => _SessionID, - reason => Err - }), - Err - end. - -buffer(Worker, STopic, Msg) -> - Worker ! {buffer, STopic, Msg}, - ok. - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -init(#{ - remote_pid := RemotePid, - session_id := SessionID, - session_tab := SessionTab -}) -> - process_flag(trap_exit, true), - erlang:monitor(process, RemotePid), - ?tp(ps_worker_started, #{ - remote_pid => RemotePid, - sid => SessionID - }), - {ok, #state{ - remote_pid = RemotePid, - session_id = SessionID, - session_tab = SessionTab, - messages = ets:new(?MODULE, [protected, ordered_set]), - buffering = true - }}. - -handle_call(pendings, _From, State) -> - %% Debug API - {reply, {State#state.messages, State#state.remote_pid}, State}; -handle_call({resume_end, RemotePid}, _From, #state{remote_pid = RemotePid} = State) -> - ?tp(ps_worker_resume_end, #{sid => State#state.session_id}), - {reply, {ok, State#state.messages}, State#state{buffering = false}}; -handle_call({resume_end, _RemotePid}, _From, State) -> - ?tp(ps_worker_resume_end_error, #{sid => State#state.session_id}), - {reply, {error, wrong_remote_pid}, State}; -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info({buffer, _STopic, _Msg}, State) when not State#state.buffering -> - ?tp(ps_worker_drop_deliver, #{ - sid => State#state.session_id, - msg_id => emqx_message:id(_Msg) - }), - {noreply, State}; -handle_info({buffer, STopic, Msg}, State) when State#state.buffering -> - ?tp(ps_worker_deliver, #{ - sid => State#state.session_id, - msg_id => emqx_message:id(Msg) - }), - ets:insert(State#state.messages, {{Msg, STopic}}), - {noreply, State}; -handle_info({'DOWN', _, process, RemotePid, _Reason}, #state{remote_pid = RemotePid} = State) -> - ?tp(warning, ps_worker, #{ - event => worker_remote_died, - sid => State#state.session_id, - msg => "Remote pid died. Exiting." - }), - {stop, normal, State}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(shutdown, _State) -> - ?tp(ps_worker_shutdown, #{sid => _State#state.session_id}), - ok; -terminate(_, _State) -> - ok. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/emqx/src/emqx_session_router_worker_sup.erl b/apps/emqx/src/emqx_session_router_worker_sup.erl deleted file mode 100644 index 473f9ccd2..000000000 --- a/apps/emqx/src/emqx_session_router_worker_sup.erl +++ /dev/null @@ -1,64 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_session_router_worker_sup). - --behaviour(supervisor). - --export([start_link/1]). - --export([ - abort_worker/1, - start_worker/2 -]). - --export([init/1]). - -start_link(SessionTab) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, SessionTab). - -start_worker(SessionID, RemotePid) -> - supervisor:start_child(?MODULE, [ - #{ - session_id => SessionID, - remote_pid => RemotePid - } - ]). - -abort_worker(Pid) -> - supervisor:terminate_child(?MODULE, Pid). - -%%-------------------------------------------------------------------- -%% Supervisor callbacks -%%-------------------------------------------------------------------- - -init(SessionTab) -> - %% Resume worker - Worker = #{ - id => session_router_worker, - start => {emqx_session_router_worker, start_link, [SessionTab]}, - restart => transient, - shutdown => 2000, - type => worker, - modules => [emqx_session_router_worker] - }, - Spec = #{ - strategy => simple_one_for_one, - intensity => 1, - period => 5 - }, - - {ok, {Spec, [Worker]}}.