diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b30978ac6..384f93225 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -213,7 +213,7 @@ process(?CONNECT_PACKET(Var), State0) -> %% ACCEPT {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> - exit({shutdown, Error}) + {stop, {shutdown, Error}, State2} end; {error, Reason}-> ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), @@ -381,12 +381,12 @@ stop_if_auth_failure(_RC, State) -> shutdown(_Error, #proto_state{client_id = undefined}) -> ignore; - -shutdown(conflict, #proto_state{client_id = _ClientId}) -> +shutdown(conflict, _State) -> + %% let it down + ignore; +shutdown(mnesia_conflict, _State) -> %% let it down - %% emqttd_cm:unreg(ClientId); ignore; - shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ?LOG(debug, "Shutdown for ~p", [Error], State), Client = client(State), @@ -552,7 +552,7 @@ sp(false) -> 0. clean_retain(false, Msg = #mqtt_message{retain = true}) -> Msg#mqtt_message{retain = false}; -clean_retain(true, Msg) -> +clean_retain(_IsBridge, Msg) -> Msg. %%-------------------------------------------------------------------- diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 4d4e22160..fa1a0c70c 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -284,5 +284,7 @@ clean_routes_(Node) -> mnesia:transaction(Clean). update_stats_() -> - emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)). + Size = mnesia:table_info(mqtt_route, size), + emqttd_stats:setstats('routes/count', 'routes/max', Size), + emqttd_stats:setstats('topics/count', 'topics/max', Size). diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 054e81c95..a46d56fa6 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -156,7 +156,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat {reply, {ok, SessPid, true}, State}; {error, Erorr} -> {reply, {error, Erorr}, State} - end + end end; %% Transient Session @@ -183,16 +183,14 @@ handle_cast(Msg, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of {ok, ClientId} -> - mnesia:transaction(fun() -> - case mnesia:wread({mqtt_session, ClientId}) of - [] -> - ok; - [Sess = #mqtt_session{sess_pid = DownPid}] -> - mnesia:delete_object(mqtt_session, Sess, write); - [_Sess] -> - ok - end - end), + case mnesia:dirty_read({mqtt_session, ClientId}) of + [] -> + ok; + [Sess = #mqtt_session{sess_pid = DownPid}] -> + mnesia:dirty_delete_object(Sess); + [_Sess] -> + ok + end, {noreply, erase_monitor(MRef, State), hibernate}; error -> lager:error("MRef of session ~p not found", [DownPid]), @@ -216,8 +214,7 @@ code_change(_OldVsn, State, _Extra) -> create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> case create_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> - {reply, {ok, SessPid, false}, - monitor_session(ClientId, SessPid, State)}; + {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)}; {error, Error} -> {reply, {error, Error}, State} end. @@ -284,15 +281,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPi remove_session(Session); %% Remote node -destroy_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid}) -> +destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) -> Node = node(SessPid), case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of ok -> remove_session(Session); {badrpc, nodedown} -> ?LOG(error, "Node '~s' down", [Node], Session), - remove_session(Session); + remove_session(Session); {badrpc, Reason} -> ?LOG(error, "Failed to destory ~p on remote node ~p for ~s", [SessPid, Node, Reason], Session), @@ -300,10 +296,7 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, end. remove_session(Session) -> - case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end. + mnesia:dirty_delete_object(Session). monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> MRef = erlang:monitor(process, SessPid), diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index efabadb96..1c2e7f31a 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -24,8 +24,6 @@ -include("emqttd.hrl"). --define(SM, emqttd_sm). - -define(HELPER, emqttd_sm_helper). %% API @@ -44,11 +42,11 @@ init([]) -> %% Helper StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Helper = {?HELPER, {?HELPER, start_link, [StatsFun]}, - permanent, 5000, worker, [?HELPER]}, + permanent, 5000, worker, [?HELPER]}, %% SM Pool Sup - MFA = {?SM, start_link, []}, - PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), + MFA = {emqttd_sm, start_link, []}, + PoolSup = emqttd_pool_sup:spec([emqttd_sm, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 73a1471ac..6d84395e2 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -57,14 +57,14 @@ %% $SYS Topics for Subscribers -define(SYSTOP_PUBSUB, [ - 'routes/count', % ... - 'routes/max', % ... 'topics/count', % ... 'topics/max', % ... 'subscribers/count', % ... 'subscribers/max', % ... 'subscriptions/count', % ... - 'subscriptions/max' % ... + 'subscriptions/max', % ... + 'routes/count', % ... + 'routes/max' % ... ]). %% $SYS Topic for retained