mqtt_session and mqtt_local_session

This commit is contained in:
Feng Lee 2016-08-10 21:45:37 +08:00
parent c67a30bf7b
commit d6041395ef
2 changed files with 13 additions and 12 deletions

View File

@ -85,7 +85,7 @@ start_session(CleanSess, ClientId) ->
%% @doc Lookup a Session %% @doc Lookup a Session
-spec(lookup_session(binary()) -> mqtt_session() | undefined). -spec(lookup_session(binary()) -> mqtt_session() | undefined).
lookup_session(ClientId) -> lookup_session(ClientId) ->
case mnesia:dirty_read(session, ClientId) of case mnesia:dirty_read(mqtt_session, ClientId) of
[Session] -> Session; [Session] -> Session;
[] -> undefined [] -> undefined
end. end.
@ -166,11 +166,13 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of case dict:find(MRef, State#state.monitors) of
{ok, ClientId} -> {ok, ClientId} ->
mnesia:transaction(fun() -> mnesia:transaction(fun() ->
case mnesia:wread({session, ClientId}) of case mnesia:wread({mqtt_session, ClientId}) of
[] -> ok; [] ->
ok;
[Sess = #mqtt_session{sess_pid = DownPid}] -> [Sess = #mqtt_session{sess_pid = DownPid}] ->
mnesia:delete_object(session, Sess, write); mnesia:delete_object(mqtt_session, Sess, write);
[_Sess] -> ok [_Sess] ->
ok
end end
end), end),
{noreply, erase_monitor(MRef, State), hibernate}; {noreply, erase_monitor(MRef, State), hibernate};
@ -221,9 +223,9 @@ create_session(CleanSess, ClientId, ClientPid) ->
insert_session(Session = #mqtt_session{client_id = ClientId}) -> insert_session(Session = #mqtt_session{client_id = ClientId}) ->
mnesia:transaction( mnesia:transaction(
fun() -> fun() ->
case mnesia:wread({session, ClientId}) of case mnesia:wread({mqtt_session, ClientId}) of
[] -> [] ->
mnesia:write(session, Session, write); mnesia:write(mqtt_session, Session, write);
[#mqtt_session{sess_pid = SessPid}] -> [#mqtt_session{sess_pid = SessPid}] ->
mnesia:abort({conflict, SessPid}) mnesia:abort({conflict, SessPid})
end end
@ -280,7 +282,7 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
end. end.
remove_session(Session) -> remove_session(Session) ->
case mnesia:transaction(fun mnesia:delete_object/3, [session, Session, write]) of case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Error} -> {error, Error} {aborted, Error} -> {error, Error}
end. end.

View File

@ -54,10 +54,9 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
lager:error("!!!Mnesia node down: ~s", [Node]), lager:error("!!!Mnesia node down: ~s", [Node]),
Fun = fun() -> Fun = fun() ->
ClientIds = ClientIds =
mnesia:select(session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'},
[{'==', {node, '$2'}, Node}], [{'==', {node, '$2'}, Node}], ['$1']}]),
['$1']}]), lists:foreach(fun(ClientId) -> mnesia:delete({mqtt_session, ClientId}) end, ClientIds)
lists:foreach(fun(ClientId) -> mnesia:delete({session, ClientId}) end, ClientIds)
end, end,
mnesia:async_dirty(Fun), mnesia:async_dirty(Fun),
{noreply, State}; {noreply, State};