fix issue#280

This commit is contained in:
Feng 2015-09-10 11:18:30 +08:00
parent ecb71fd4b8
commit fa25d60893
7 changed files with 231 additions and 154 deletions

View File

@ -157,9 +157,17 @@ handle_info(Info, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]),
{noreply, State}. {noreply, State}.
terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) -> terminate(Reason, #state{peername = Peername,
lager:info("Client ~s terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), transport = Transport,
socket = Socket,
keepalive = KeepAlive,
proto_state = ProtoState}) ->
lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]),
emqttd_keepalive:cancel(KeepAlive), emqttd_keepalive:cancel(KeepAlive),
if
Reason == {shutdown, conn_closed} -> ok;
true -> Transport:fast_close(Socket)
end,
case {ProtoState, Reason} of case {ProtoState, Reason} of
{undefined, _} -> ok; {undefined, _} -> ok;
{_, {shutdown, Error}} -> {_, {shutdown, Error}} ->

View File

@ -176,10 +176,11 @@ sessions(["show", ClientId0]) ->
listeners([]) -> listeners([]) ->
lists:foreach(fun({{Protocol, Port}, Pid}) -> lists:foreach(fun({{Protocol, Port}, Pid}) ->
?PRINT("listener ~s:~p~n", [Protocol, Port]), ?PRINT("listener ~s:~w~n", [Protocol, Port]),
?PRINT(" acceptors: ~p~n", [esockd:get_acceptors(Pid)]), ?PRINT(" acceptors: ~w~n", [esockd:get_acceptors(Pid)]),
?PRINT(" max_clients: ~p~n", [esockd:get_max_clients(Pid)]), ?PRINT(" max_clients: ~w~n", [esockd:get_max_clients(Pid)]),
?PRINT(" current_clients: ~p~n", [esockd:get_current_clients(Pid)]) ?PRINT(" current_clients: ~w~n", [esockd:get_current_clients(Pid)]),
?PRINT(" shutdown_count: ~p~n", [esockd:get_shutdown_count(Pid)])
end, esockd:listeners()). end, esockd:listeners()).
bridges(["list"]) -> bridges(["list"]) ->

View File

@ -78,9 +78,12 @@
%% ClientId: Identifier of Session %% ClientId: Identifier of Session
client_id :: binary(), client_id :: binary(),
%% Client Pid linked with session %% Client Pid bind with session
client_pid :: pid(), client_pid :: pid(),
%% Client Monitor
client_mon :: reference(),
%% Last packet id of the session %% Last packet id of the session
packet_id = 1, packet_id = 1,
@ -136,6 +139,8 @@
timestamp}). timestamp}).
-define(PUBSUB_TIMEOUT, 60000).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a session. %% @doc Start a session.
%% @end %% @end
@ -149,69 +154,69 @@ start_link(CleanSess, ClientId, ClientPid) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(pid(), mqtt_client_id(), pid()) -> ok. -spec resume(pid(), mqtt_client_id(), pid()) -> ok.
resume(Session, ClientId, ClientPid) -> resume(SessPid, ClientId, ClientPid) ->
gen_server2:cast(Session, {resume, ClientId, ClientPid}). gen_server2:cast(SessPid, {resume, ClientId, ClientPid}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Destroy a session. %% @doc Destroy a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec destroy(pid(), mqtt_client_id()) -> ok. -spec destroy(pid(), mqtt_client_id()) -> ok.
destroy(Session, ClientId) -> destroy(SessPid, ClientId) ->
gen_server2:call(Session, {destroy, ClientId}). gen_server2:cast(SessPid, {destroy, ClientId}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Subscribe Topics %% @doc Subscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}.
subscribe(Session, TopicTable) -> subscribe(SessPid, TopicTable) ->
gen_server2:call(Session, {subscribe, TopicTable}, infinity). gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish message %% @doc Publish message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(pid(), mqtt_message()) -> ok. -spec publish(pid(), mqtt_message()) -> ok.
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
%% publish qos0 directly %% publish qos0 directly
emqttd_pubsub:publish(Msg); emqttd_pubsub:publish(Msg);
publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
%% publish qos1 directly, and client will puback automatically %% publish qos1 directly, and client will puback automatically
emqttd_pubsub:publish(Msg); emqttd_pubsub:publish(Msg);
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% publish qos2 by session %% publish qos2 by session
gen_server2:call(Session, {publish, Msg}). gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc PubAck message %% @doc PubAck message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec puback(pid(), mqtt_packet_id()) -> ok. -spec puback(pid(), mqtt_packet_id()) -> ok.
puback(Session, PktId) -> puback(SessPid, PktId) ->
gen_server2:cast(Session, {puback, PktId}). gen_server2:cast(SessPid, {puback, PktId}).
-spec pubrec(pid(), mqtt_packet_id()) -> ok. -spec pubrec(pid(), mqtt_packet_id()) -> ok.
pubrec(Session, PktId) -> pubrec(SessPid, PktId) ->
gen_server2:cast(Session, {pubrec, PktId}). gen_server2:cast(SessPid, {pubrec, PktId}).
-spec pubrel(pid(), mqtt_packet_id()) -> ok. -spec pubrel(pid(), mqtt_packet_id()) -> ok.
pubrel(Session, PktId) -> pubrel(SessPid, PktId) ->
gen_server2:cast(Session, {pubrel, PktId}). gen_server2:cast(SessPid, {pubrel, PktId}).
-spec pubcomp(pid(), mqtt_packet_id()) -> ok. -spec pubcomp(pid(), mqtt_packet_id()) -> ok.
pubcomp(Session, PktId) -> pubcomp(SessPid, PktId) ->
gen_server2:cast(Session, {pubcomp, PktId}). gen_server2:cast(SessPid, {pubcomp, PktId}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unsubscribe Topics %% @doc Unsubscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unsubscribe(pid(), [binary()]) -> ok. -spec unsubscribe(pid(), [binary()]) -> ok.
unsubscribe(Session, Topics) -> unsubscribe(SessPid, Topics) ->
gen_server2:call(Session, {unsubscribe, Topics}, infinity). gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -219,7 +224,6 @@ unsubscribe(Session, Topics) ->
init([CleanSess, ClientId, ClientPid]) -> init([CleanSess, ClientId, ClientPid]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
true = link(ClientPid),
QEnv = emqttd:env(mqtt, queue), QEnv = emqttd:env(mqtt, queue),
SessEnv = emqttd:env(mqtt, session), SessEnv = emqttd:env(mqtt, session),
Session = #session{ Session = #session{
@ -241,12 +245,13 @@ init([CleanSess, ClientId, ClientPid]) ->
collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0),
timestamp = os:timestamp()}, timestamp = os:timestamp()},
emqttd_sm:register_session(CleanSess, ClientId, info(Session)), emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
%% monitor client
MRef = erlang:monitor(process, ClientPid),
%% start statistics %% start statistics
{ok, start_collector(Session), hibernate}. {ok, start_collector(Session#session{client_mon = MRef}), hibernate}.
prioritise_call(Msg, _From, _Len, _State) -> prioritise_call(Msg, _From, _Len, _State) ->
case Msg of case Msg of
{destroy, _} -> 9;
{unsubscribe, _} -> 2; {unsubscribe, _} -> 2;
{subscribe, _} -> 1; {subscribe, _} -> 1;
_ -> 0 _ -> 0
@ -254,6 +259,7 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) -> prioritise_cast(Msg, _Len, _State) ->
case Msg of case Msg of
{destroy, _} -> 10;
{resume, _, _} -> 9; {resume, _, _} -> 9;
{pubrel, _PktId} -> 8; {pubrel, _PktId} -> 8;
{pubcomp, _PktId} -> 8; {pubcomp, _PktId} -> 8;
@ -264,6 +270,7 @@ prioritise_cast(Msg, _Len, _State) ->
prioritise_info(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) ->
case Msg of case Msg of
{'DOWN', _, process, _, _} -> 10;
{'EXIT', _, _} -> 10; {'EXIT', _, _} -> 10;
session_expired -> 10; session_expired -> 10;
{timeout, _, _} -> 5; {timeout, _, _} -> 5;
@ -282,18 +289,18 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p", lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p",
[ClientId, TopicTable, GrantedQos]), [ClientId, TopicTable, GrantedQos]),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun({Topic, Qos}, Acc) -> lists:foldl(fun({Topic, Qos}, Acc) ->
case lists:keyfind(Topic, 1, Acc) of case lists:keyfind(Topic, 1, Acc) of
{Topic, Qos} -> {Topic, Qos} ->
lager:warning([{client, ClientId}], "Session ~s " lager:warning([{client, ClientId}], "Session(~s): "
"resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc; "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc;
{Topic, OldQos} -> {Topic, OldQos} ->
lager:warning([{client, ClientId}], "Session ~s " lager:warning([{client, ClientId}], "Session(~s): "
"resubscribe ~p: old qos=~p, new qos=~p", [ClientId, Topic, OldQos, Qos]), "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]),
lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
false -> false ->
%%TODO: the design is ugly, rewrite later...:( %%TODO: the design is ugly, rewrite later...:(
@ -314,7 +321,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client
%% unsubscribe from topic tree %% unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics), ok = emqttd_pubsub:unsubscribe(Topics),
lager:info([{client, ClientId}], "Session ~s unsubscribe ~p", [ClientId, Topics]), lager:info([{client, ClientId}], "Session(~s) unsubscribe ~p", [ClientId, Topics]),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun(Topic, Acc) -> lists:foldl(fun(Topic, Acc) ->
@ -322,7 +329,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client
{Topic, _Qos} -> {Topic, _Qos} ->
lists:keydelete(Topic, 1, Acc); lists:keydelete(Topic, 1, Acc);
false -> false ->
lager:warning([{client, ClientId}], "Session ~s not subscribe ~s", [ClientId, Topic]), Acc lager:warning([{client, ClientId}], "Session(~s) not subscribe ~s", [ClientId, Topic]), Acc
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
@ -338,36 +345,46 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; {reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false -> false ->
lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message " lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message "
"for too many awaiting_rel: ~p", [ClientId, Msg]), "for too many awaiting_rel: ~p", [ClientId, Msg]),
{reply, {error, dropped}, Session} {reply, {error, dropped}, Session}
end; end;
handle_call({destroy, ClientId}, _From, Session = #session{client_id = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]),
{stop, {shutdown, destroy}, ok, Session};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:critical("Unexpected Request: ~p", [Req]), lager:critical("Unexpected Request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, ok, State}.
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]),
{stop, {shutdown, destroy}, Session};
handle_cast({resume, ClientId, ClientPid}, Session) -> handle_cast({resume, ClientId, ClientPid}, Session) ->
#session{client_id = ClientId, #session{client_id = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
client_mon = MRef,
inflight_queue = InflightQ, inflight_queue = InflightQ,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
expired_timer = ETimer} = Session, expired_timer = ETimer} = Session,
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), lager:info([{client, ClientId}], "Session(~s) resumed by ~p", [ClientId, ClientPid]),
%% cancel expired timer %% cancel expired timer
cancel_timer(ETimer), cancel_timer(ETimer),
kick(ClientId, ClientPid, OldClientPid), %% Kickout old client
if
true = link(ClientPid), OldClientPid == undefined ->
ok;
OldClientPid == ClientPid ->
ok; %% ??
true ->
lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p",
[ClientId, ClientPid, OldClientPid]),
OldClientPid ! {stop, duplicate_id, ClientPid},
erlang:demonitor(MRef, [flush])
end,
%% Redeliver PUBREL %% Redeliver PUBREL
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
@ -379,6 +396,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
[cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
Session1 = Session#session{client_pid = ClientPid, Session1 = Session#session{client_pid = ClientPid,
client_mon = erlang:monitor(process, ClientPid),
awaiting_ack = #{}, awaiting_ack = #{},
awaiting_comp = #{}, awaiting_comp = #{},
expired_timer = undefined}, expired_timer = undefined},
@ -399,7 +417,7 @@ handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_a
cancel_timer(TRef), cancel_timer(TRef),
noreply(dequeue(acked(PktId, Session))); noreply(dequeue(acked(PktId, Session)));
error -> error ->
lager:critical("Session(~s): cannot find PUBACK '~p'!", [ClientId, PktId]), lager:error([{client, ClientId}], "Session(~s) cannot find PUBACK ~w", [ClientId, PktId]),
noreply(Session) noreply(Session)
end; end;
@ -416,7 +434,7 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
noreply(dequeue(Session1)); noreply(dequeue(Session1));
error -> error ->
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), lager:error([{client, ClientId}], "Session(~s) cannot find PUBREC ~w", [ClientId, PktId]),
noreply(Session) noreply(Session)
end; end;
@ -429,7 +447,7 @@ handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId,
emqttd_pubsub:publish(Msg), emqttd_pubsub:publish(Msg),
noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
error -> error ->
lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]), lager:error([{client, ClientId}], "Session(~s) cannot find PUBREL ~w", [ClientId, PktId]),
noreply(Session) noreply(Session)
end; end;
@ -440,7 +458,7 @@ handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_
cancel_timer(TRef), cancel_timer(TRef),
noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
error -> error ->
lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]), lager:error("Session(~s) cannot find PUBCOMP ~w", [ClientId, PktId]),
noreply(Session) noreply(Session)
end; end;
@ -467,8 +485,6 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{messa
true -> true ->
{noreply, deliver(Msg, Session)}; {noreply, deliver(Msg, Session)};
false -> false ->
%%TODO: should emit alarm?
%%lager:error([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]),
{noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
end; end;
@ -487,35 +503,35 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue =
{ok, {{Retries, Timeout}, _TRef}} -> {ok, {{Retries, Timeout}, _TRef}} ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
{noreply, Session#session{awaiting_ack = AwaitingAck1}}; {noreply, Session#session{awaiting_ack = AwaitingAck1}, hibernate};
error -> error ->
% TODO: too many logs when overloaded... % TODO: too many logs when overloaded...
% lager:error([{client, ClientId}], "Session ~s " % lager:error([{client, ClientId}], "Session ~s "
% "Cannot find Awaiting Ack:~p", [ClientId, PktId]), % "Cannot find Awaiting Ack:~p", [ClientId, PktId]),
{noreply, Session} {noreply, Session, hibernate}
end; end;
handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel}) -> awaiting_rel = AwaitingRel}) ->
case maps:find(PktId, AwaitingRel) of case maps:find(PktId, AwaitingRel) of
{ok, {Msg, _TRef}} -> {ok, {Msg, _TRef}} ->
lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" lager:error([{client, ClientId}], "Session(~s) AwaitingRel Timout!~n"
"Drop Message:~p", [ClientId, Msg]), "Drop Message:~p", [ClientId, Msg]),
noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
error -> error ->
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]), lager:error([{client, ClientId}], "Session(~s) cannot find AwaitingRel ~w", [ClientId, PktId]),
{noreply, Session} {noreply, Session, hibernate}
end; end;
handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId, handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId,
awaiting_comp = Awaiting}) -> awaiting_comp = Awaiting}) ->
case maps:find(PktId, Awaiting) of case maps:find(PktId, Awaiting) of
{ok, _TRef} -> {ok, _TRef} ->
lager:error([{client, ClientId}], "Session ~s " lager:error([{client, ClientId}], "Session(~s) "
"Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]),
noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
error -> error ->
lager:error([{client, ClientId}], "Session ~s " lager:error([{client, ClientId}], "Session(~s) "
"Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]),
noreply(Session) noreply(Session)
end; end;
@ -524,32 +540,29 @@ handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id =
emqttd_sm:register_session(CleanSess, ClientId, info(Session)), emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
{noreply, start_collector(Session), hibernate}; {noreply, start_collector(Session), hibernate};
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = true,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
{stop, normal, Session}; {stop, normal, Session};
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = false,
client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
expired_after = Expires}) -> expired_after = Expires}) ->
lager:info("Session ~s unlink with client ~p: reason=~p",
[ClientId, ClientPid, Reason]),
TRef = timer(Expires, session_expired), TRef = timer(Expires, session_expired),
noreply(Session#session{client_pid = undefined, expired_timer = TRef}); noreply(Session#session{client_pid = undefined, client_mon = undefined, expired_timer = TRef});
handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId, handle_info({'DOWN', _MRef, process, Pid, Reason}, Session = #session{client_id = ClientId,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
lager:error([{client, ClientId}], "Session(~s): unexpected DOWN: "
lager:error("Session ~s received unexpected EXIT:" "client_pid=~p, down_pid=~p, reason=~p",
" client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), [ClientId, ClientPid, Pid, Reason]),
{noreply, Session}; noreply(Session);
handle_info(session_expired, Session = #session{client_id = ClientId}) -> handle_info(session_expired, Session = #session{client_id = ClientId}) ->
lager:error("Session ~s expired, shutdown now!", [ClientId]), lager:error("Session(~s) expired, shutdown now.", [ClientId]),
{stop, {shutdown, expired}, Session}; {stop, {shutdown, expired}, Session};
handle_info(Info, Session = #session{client_id = ClientId}) -> handle_info(Info, Session = #session{client_id = ClientId}) ->
lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), lager:critical("Session(~s) unexpected info: ~p", [ClientId, Info]),
{noreply, Session}. {noreply, Session}.
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
@ -562,19 +575,6 @@ code_change(_OldVsn, Session, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------
%% Kick duplicated client
%%------------------------------------------------------------------------------
kick(_ClientId, _ClientPid, undefined) ->
ok;
kick(_ClientId, ClientPid, ClientPid) ->
ok;
kick(ClientId, ClientPid, OldClientPid) ->
lager:warning("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
unlink(OldClientPid),
OldClientPid ! {stop, duplicate_id, ClientPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Check inflight and awaiting_rel %% Check inflight and awaiting_rel
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -688,13 +688,12 @@ info(#session{clean_sess = CleanSess,
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
timestamp = CreatedAt}) -> timestamp = CreatedAt}) ->
Stats = emqttd_mqueue:stats(MessageQueue), Stats = emqttd_mqueue:stats(MessageQueue),
[{pid, self()}, [{clean_sess, CleanSess},
{clean_sess, CleanSess},
{subscriptions, Subscriptions}, {subscriptions, Subscriptions},
{max_inflight, MaxInflight}, {max_inflight, MaxInflight},
{inflight_queue, length(InflightQueue)}, {inflight_queue, length(InflightQueue)},
{message_queue, proplists:get_value(len, Stats)}, {message_queue, proplists:get_value(len, Stats)},
{message_dropped, proplists:get_value(dropped, Stats)}, {message_dropped,proplists:get_value(dropped, Stats)},
{awaiting_rel, maps:size(AwaitingRel)}, {awaiting_rel, maps:size(AwaitingRel)},
{awaiting_ack, maps:size(AwaitingAck)}, {awaiting_ack, maps:size(AwaitingAck)},
{awaiting_comp, maps:size(AwaitingComp)}, {awaiting_comp, maps:size(AwaitingComp)},

View File

@ -38,7 +38,7 @@
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% API Function Exports %% API Function Exports
-export([start_link/2, pool/0]). -export([start_link/1, pool/0]).
-export([start_session/2, lookup_session/1]). -export([start_session/2, lookup_session/1]).
@ -53,7 +53,7 @@
%% gen_server2 priorities %% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {id, statsfun}). -record(state, {id}).
-define(SM_POOL, sm_pool). -define(SM_POOL, sm_pool).
@ -81,11 +81,9 @@ mnesia(copy) ->
%% @doc Start a session manager %% @doc Start a session manager
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when -spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
Id :: pos_integer(), start_link(Id) ->
StatsFun :: fun(). gen_server2:start_link(?MODULE, [Id], []).
start_link(Id, StatsFun) ->
gen_server2:start_link(?MODULE, [Id, StatsFun], []).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Pool name. %% @doc Pool name.
@ -121,12 +119,8 @@ lookup_session(ClientId) ->
CleanSess :: boolean(), CleanSess :: boolean(),
ClientId :: binary(), ClientId :: binary(),
Info :: [tuple()]. Info :: [tuple()].
register_session(true, ClientId, Info) -> register_session(CleanSess, ClientId, Info) ->
ets:insert(mqtt_transient_session, {ClientId, Info}); ets:insert(sesstab(CleanSess), {{ClientId, self()}, Info}).
register_session(false, ClientId, Info) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server2:cast(SM, {register, ClientId, Info}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unregister a session. %% @doc Unregister a session.
@ -135,21 +129,24 @@ register_session(false, ClientId, Info) ->
-spec unregister_session(CleanSess, ClientId) -> ok when -spec unregister_session(CleanSess, ClientId) -> ok when
CleanSess :: boolean(), CleanSess :: boolean(),
ClientId :: binary(). ClientId :: binary().
unregister_session(true, ClientId) -> unregister_session(CleanSess, ClientId) ->
ets:delete(mqtt_transient_session, ClientId); ets:delete(sesstab(CleanSess), {ClientId, self()}).
unregister_session(false, ClientId) ->
SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server2:cast(SM, {unregister, ClientId}).
call(SM, Req) -> gen_server2:call(SM, Req, infinity). sesstab(true) ->
mqtt_transient_session;
sesstab(false) ->
mqtt_persistent_session.
call(SM, Req) ->
gen_server2:call(SM, Req, infinity).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init([Id, StatsFun]) -> init([Id]) ->
gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}. {ok, #state{id = Id}}.
prioritise_call(_Msg, _From, _Len, _State) -> prioritise_call(_Msg, _From, _Len, _State) ->
1. 1.
@ -186,15 +183,6 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
%% persistent session
handle_cast({register, ClientId, Info}, State) ->
ets:insert(mqtt_persistent_session, {ClientId, Info}),
{noreply, setstats(State)};
handle_cast({unregister, ClientId}, State) ->
ets:delete(mqtt_persistent_session, ClientId),
{noreply, setstats(State)};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]), lager:critical("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
@ -204,7 +192,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
[mnesia:delete_object(session, Sess, write) || Sess [mnesia:delete_object(session, Sess, write) || Sess
<- mnesia:index_read(session, DownPid, #mqtt_session.sess_pid)] <- mnesia:index_read(session, DownPid, #mqtt_session.sess_pid)]
end), end),
{noreply, setstats(State)}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]), lager:critical("Unexpected Info: ~p", [Info]),
@ -325,10 +313,5 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
end. end.
remove_session(Session) -> remove_session(Session) ->
mnesia:transaction(fun() -> mnesia:transaction(fun() -> mnesia:delete_object(session, Session, write) end).
mnesia:delete_object(session, Session, write)
end).
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(mqtt_persistent_session, size)), State.

83
src/emqttd_sm_helper.erl Normal file
View File

@ -0,0 +1,83 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd session helper.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
%% TODO: Monitor mnesia node down...
-module(emqttd_sm_helper).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
%% API Function Exports
-export([start_link/0]).
-behaviour(gen_server).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {statsfun, ticker}).
%%------------------------------------------------------------------------------
%% @doc Start a session helper
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
start_link() ->
gen_server:start_link(?MODULE, [], []).
init([]) ->
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
{ok, TRef} = timer:send_interval(1000, self(), tick),
{ok, #state{statsfun = StatsFun, ticker = TRef}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]),
{noreply, State}.
handle_info(tick, State) ->
{noreply, setstats(State)};
handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State = #state{ticker = TRef}) ->
timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(mqtt_persistent_session, size)), State.

View File

@ -31,6 +31,9 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-define(CHILD(Mod), {Mod, {Mod, start_link, []},
permanent, 5000, worker, [Mod]}).
%% API %% API
-export([start_link/0]). -export([start_link/0]).
@ -46,15 +49,14 @@ init([]) ->
init_session_ets(), init_session_ets(),
Schedulers = erlang:system_info(schedulers), Schedulers = erlang:system_info(schedulers),
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Managers = lists:map(
Children = lists:map(
fun(I) -> fun(I) ->
Name = {emqttd_sm, I}, Name = {emqttd_sm, I},
gproc_pool:add_worker(emqttd_sm:pool(), Name, I), gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
{Name, {emqttd_sm, start_link, [I, StatsFun]}, {Name, {emqttd_sm, start_link, [I]},
permanent, 10000, worker, [emqttd_sm]} permanent, 10000, worker, [emqttd_sm]}
end, lists:seq(1, Schedulers)), end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}. {ok, {{one_for_all, 10, 100}, [?CHILD(emqttd_sm_helper) | Managers]}}.
init_session_ets() -> init_session_ets() ->
Tables = [mqtt_transient_session, mqtt_persistent_session], Tables = [mqtt_transient_session, mqtt_persistent_session],

View File

@ -40,7 +40,8 @@
-export([init/1]). -export([init/1]).
%% Helper macro for declaring children of supervisor %% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). -define(CHILD(Mod, Type), {Mod, {Mod, start_link, []},
permanent, 5000, Type, [Mod]}).
%%%============================================================================= %%%=============================================================================
%%% API %%% API