Remove the unnecessary transactions to optimize session management
This commit is contained in:
parent
d2a4e2c615
commit
c308037b1a
|
@ -156,7 +156,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat
|
||||||
{reply, {ok, SessPid, true}, State};
|
{reply, {ok, SessPid, true}, State};
|
||||||
{error, Erorr} ->
|
{error, Erorr} ->
|
||||||
{reply, {error, Erorr}, State}
|
{reply, {error, Erorr}, State}
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% Transient Session
|
%% Transient Session
|
||||||
|
@ -183,16 +183,14 @@ handle_cast(Msg, State) ->
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
case dict:find(MRef, State#state.monitors) of
|
case dict:find(MRef, State#state.monitors) of
|
||||||
{ok, ClientId} ->
|
{ok, ClientId} ->
|
||||||
mnesia:transaction(fun() ->
|
case mnesia:dirty_read({mqtt_session, ClientId}) of
|
||||||
case mnesia:wread({mqtt_session, ClientId}) of
|
[] ->
|
||||||
[] ->
|
ok;
|
||||||
ok;
|
[Sess = #mqtt_session{sess_pid = DownPid}] ->
|
||||||
[Sess = #mqtt_session{sess_pid = DownPid}] ->
|
mnesia:dirty_delete_object(Sess);
|
||||||
mnesia:delete_object(mqtt_session, Sess, write);
|
[_Sess] ->
|
||||||
[_Sess] ->
|
ok
|
||||||
ok
|
end,
|
||||||
end
|
|
||||||
end),
|
|
||||||
{noreply, erase_monitor(MRef, State), hibernate};
|
{noreply, erase_monitor(MRef, State), hibernate};
|
||||||
error ->
|
error ->
|
||||||
lager:error("MRef of session ~p not found", [DownPid]),
|
lager:error("MRef of session ~p not found", [DownPid]),
|
||||||
|
@ -283,15 +281,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPi
|
||||||
remove_session(Session);
|
remove_session(Session);
|
||||||
|
|
||||||
%% Remote node
|
%% Remote node
|
||||||
destroy_session(Session = #mqtt_session{client_id = ClientId,
|
destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) ->
|
||||||
sess_pid = SessPid}) ->
|
|
||||||
Node = node(SessPid),
|
Node = node(SessPid),
|
||||||
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
|
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
|
||||||
ok ->
|
ok ->
|
||||||
remove_session(Session);
|
remove_session(Session);
|
||||||
{badrpc, nodedown} ->
|
{badrpc, nodedown} ->
|
||||||
?LOG(error, "Node '~s' down", [Node], Session),
|
?LOG(error, "Node '~s' down", [Node], Session),
|
||||||
remove_session(Session);
|
remove_session(Session);
|
||||||
{badrpc, Reason} ->
|
{badrpc, Reason} ->
|
||||||
?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
|
?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
|
||||||
[SessPid, Node, Reason], Session),
|
[SessPid, Node, Reason], Session),
|
||||||
|
@ -299,10 +296,7 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
remove_session(Session) ->
|
remove_session(Session) ->
|
||||||
case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of
|
mnesia:dirty_delete_object(Session).
|
||||||
{atomic, ok} -> ok;
|
|
||||||
{aborted, Error} -> {error, Error}
|
|
||||||
end.
|
|
||||||
|
|
||||||
monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
|
monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
|
||||||
MRef = erlang:monitor(process, SessPid),
|
MRef = erlang:monitor(process, SessPid),
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-define(SM, emqttd_sm).
|
|
||||||
|
|
||||||
-define(HELPER, emqttd_sm_helper).
|
-define(HELPER, emqttd_sm_helper).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
|
@ -44,11 +42,11 @@ init([]) ->
|
||||||
%% Helper
|
%% Helper
|
||||||
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
||||||
Helper = {?HELPER, {?HELPER, start_link, [StatsFun]},
|
Helper = {?HELPER, {?HELPER, start_link, [StatsFun]},
|
||||||
permanent, 5000, worker, [?HELPER]},
|
permanent, 5000, worker, [?HELPER]},
|
||||||
|
|
||||||
%% SM Pool Sup
|
%% SM Pool Sup
|
||||||
MFA = {?SM, start_link, []},
|
MFA = {emqttd_sm, start_link, []},
|
||||||
PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]),
|
PoolSup = emqttd_pool_sup:spec([emqttd_sm, hash, erlang:system_info(schedulers), MFA]),
|
||||||
|
|
||||||
{ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.
|
{ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue