refactor: Move deliver and persist out of server context

This commit is contained in:
Tobias Lindahl 2021-07-02 14:03:12 +02:00
parent 1c274b15d2
commit 31cb1179af
1 changed files with 12 additions and 7 deletions

View File

@ -181,14 +181,23 @@ persist(Msg) ->
case match_routes(emqx_message:topic(Msg)) of
[] -> ok;
Routes ->
%% TODO: This should store in external backend
ekka_mnesia:dirty_write(?MSG_TAB, Msg),
Fun = fun(Route) -> cast(pick(Route), {persist, Route, Msg}) end,
MsgId = emqx_message:id(Msg),
Fun = fun(#route{dest = SessionID}) ->
Key = {SessionID, MsgId, ?UNDELIVERED},
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key })
end,
lists:foreach(Fun, Routes)
end
end.
delivered(SessionID, MsgIds) ->
cast(pick(SessionID), {delivered, SessionID, MsgIds}).
delivered(SessionID, MsgIDs) ->
Fun = fun(MsgID) ->
Key = {SessionID, MsgID, ?DELIVERED},
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key })
end,
lists:foreach(Fun, MsgIDs),
pending(SessionID) ->
call(pick(SessionID), {pending, SessionID}).
@ -218,10 +227,6 @@ handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({persist, #route{dest = SessionID}, Msg}, State) ->
Key = {SessionID, emqx_message:id(Msg), ?UNDELIVERED},
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key }),
{noreply, State};
handle_cast({delivered, SessionID, MsgIDs}, State) ->
Fun = fun(MsgID) ->
Key = {SessionID, MsgID, ?DELIVERED},