From 31cb1179afdebe43f7bb98c3f4087894bb588482 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Fri, 2 Jul 2021 14:03:12 +0200 Subject: [PATCH] refactor: Move deliver and persist out of server context --- apps/emqx/src/emqx_session_router.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 8e88a8244..d123bd5f3 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -181,14 +181,23 @@ persist(Msg) -> case match_routes(emqx_message:topic(Msg)) of [] -> ok; Routes -> + %% TODO: This should store in external backend ekka_mnesia:dirty_write(?MSG_TAB, Msg), - Fun = fun(Route) -> cast(pick(Route), {persist, Route, Msg}) end, + MsgId = emqx_message:id(Msg), + Fun = fun(#route{dest = SessionID}) -> + Key = {SessionID, MsgId, ?UNDELIVERED}, + ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }) + end, lists:foreach(Fun, Routes) end end. -delivered(SessionID, MsgIds) -> - cast(pick(SessionID), {delivered, SessionID, MsgIds}). +delivered(SessionID, MsgIDs) -> + Fun = fun(MsgID) -> + Key = {SessionID, MsgID, ?DELIVERED}, + ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }) + end, + lists:foreach(Fun, MsgIDs), pending(SessionID) -> call(pick(SessionID), {pending, SessionID}). @@ -218,10 +227,6 @@ handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({persist, #route{dest = SessionID}, Msg}, State) -> - Key = {SessionID, emqx_message:id(Msg), ?UNDELIVERED}, - ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }), - {noreply, State}; handle_cast({delivered, SessionID, MsgIDs}, State) -> Fun = fun(MsgID) -> Key = {SessionID, MsgID, ?DELIVERED},