Merge branch 'develop' into ws_proxy

This commit is contained in:
HeeeJianBo 2017-12-01 22:22:20 +08:00
commit 70570dd943
5 changed files with 28 additions and 35 deletions

View File

@ -213,7 +213,7 @@ process(?CONNECT_PACKET(Var), State0) ->
%% ACCEPT %% ACCEPT
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
{error, Error} -> {error, Error} ->
exit({shutdown, Error}) {stop, {shutdown, Error}, State2}
end; end;
{error, Reason}-> {error, Reason}->
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
@ -381,12 +381,12 @@ stop_if_auth_failure(_RC, State) ->
shutdown(_Error, #proto_state{client_id = undefined}) -> shutdown(_Error, #proto_state{client_id = undefined}) ->
ignore; ignore;
shutdown(conflict, _State) ->
shutdown(conflict, #proto_state{client_id = _ClientId}) -> %% let it down
ignore;
shutdown(mnesia_conflict, _State) ->
%% let it down %% let it down
%% emqttd_cm:unreg(ClientId);
ignore; ignore;
shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
?LOG(debug, "Shutdown for ~p", [Error], State), ?LOG(debug, "Shutdown for ~p", [Error], State),
Client = client(State), Client = client(State),
@ -552,7 +552,7 @@ sp(false) -> 0.
clean_retain(false, Msg = #mqtt_message{retain = true}) -> clean_retain(false, Msg = #mqtt_message{retain = true}) ->
Msg#mqtt_message{retain = false}; Msg#mqtt_message{retain = false};
clean_retain(true, Msg) -> clean_retain(_IsBridge, Msg) ->
Msg. Msg.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -284,5 +284,7 @@ clean_routes_(Node) ->
mnesia:transaction(Clean). mnesia:transaction(Clean).
update_stats_() -> update_stats_() ->
emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)). Size = mnesia:table_info(mqtt_route, size),
emqttd_stats:setstats('routes/count', 'routes/max', Size),
emqttd_stats:setstats('topics/count', 'topics/max', Size).

View File

@ -156,7 +156,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat
{reply, {ok, SessPid, true}, State}; {reply, {ok, SessPid, true}, State};
{error, Erorr} -> {error, Erorr} ->
{reply, {error, Erorr}, State} {reply, {error, Erorr}, State}
end end
end; end;
%% Transient Session %% Transient Session
@ -183,16 +183,14 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of case dict:find(MRef, State#state.monitors) of
{ok, ClientId} -> {ok, ClientId} ->
mnesia:transaction(fun() -> case mnesia:dirty_read({mqtt_session, ClientId}) of
case mnesia:wread({mqtt_session, ClientId}) of [] ->
[] -> ok;
ok; [Sess = #mqtt_session{sess_pid = DownPid}] ->
[Sess = #mqtt_session{sess_pid = DownPid}] -> mnesia:dirty_delete_object(Sess);
mnesia:delete_object(mqtt_session, Sess, write); [_Sess] ->
[_Sess] -> ok
ok end,
end
end),
{noreply, erase_monitor(MRef, State), hibernate}; {noreply, erase_monitor(MRef, State), hibernate};
error -> error ->
lager:error("MRef of session ~p not found", [DownPid]), lager:error("MRef of session ~p not found", [DownPid]),
@ -216,8 +214,7 @@ code_change(_OldVsn, State, _Extra) ->
create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
case create_session(CleanSess, {ClientId, Username}, ClientPid) of case create_session(CleanSess, {ClientId, Username}, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
{reply, {ok, SessPid, false}, {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)};
monitor_session(ClientId, SessPid, State)};
{error, Error} -> {error, Error} ->
{reply, {error, Error}, State} {reply, {error, Error}, State}
end. end.
@ -284,15 +281,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPi
remove_session(Session); remove_session(Session);
%% Remote node %% Remote node
destroy_session(Session = #mqtt_session{client_id = ClientId, destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) ->
sess_pid = SessPid}) ->
Node = node(SessPid), Node = node(SessPid),
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
ok -> ok ->
remove_session(Session); remove_session(Session);
{badrpc, nodedown} -> {badrpc, nodedown} ->
?LOG(error, "Node '~s' down", [Node], Session), ?LOG(error, "Node '~s' down", [Node], Session),
remove_session(Session); remove_session(Session);
{badrpc, Reason} -> {badrpc, Reason} ->
?LOG(error, "Failed to destory ~p on remote node ~p for ~s", ?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
[SessPid, Node, Reason], Session), [SessPid, Node, Reason], Session),
@ -300,10 +296,7 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
end. end.
remove_session(Session) -> remove_session(Session) ->
case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of mnesia:dirty_delete_object(Session).
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end.
monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
MRef = erlang:monitor(process, SessPid), MRef = erlang:monitor(process, SessPid),

View File

@ -24,8 +24,6 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-define(SM, emqttd_sm).
-define(HELPER, emqttd_sm_helper). -define(HELPER, emqttd_sm_helper).
%% API %% API
@ -44,11 +42,11 @@ init([]) ->
%% Helper %% Helper
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
Helper = {?HELPER, {?HELPER, start_link, [StatsFun]}, Helper = {?HELPER, {?HELPER, start_link, [StatsFun]},
permanent, 5000, worker, [?HELPER]}, permanent, 5000, worker, [?HELPER]},
%% SM Pool Sup %% SM Pool Sup
MFA = {?SM, start_link, []}, MFA = {emqttd_sm, start_link, []},
PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), PoolSup = emqttd_pool_sup:spec([emqttd_sm, hash, erlang:system_info(schedulers), MFA]),
{ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.

View File

@ -57,14 +57,14 @@
%% $SYS Topics for Subscribers %% $SYS Topics for Subscribers
-define(SYSTOP_PUBSUB, [ -define(SYSTOP_PUBSUB, [
'routes/count', % ...
'routes/max', % ...
'topics/count', % ... 'topics/count', % ...
'topics/max', % ... 'topics/max', % ...
'subscribers/count', % ... 'subscribers/count', % ...
'subscribers/max', % ... 'subscribers/max', % ...
'subscriptions/count', % ... 'subscriptions/count', % ...
'subscriptions/max' % ... 'subscriptions/max', % ...
'routes/count', % ...
'routes/max' % ...
]). ]).
%% $SYS Topic for retained %% $SYS Topic for retained