From 0379219a044e9ae9a8f62a9c751620ee9c919098 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 23:14:09 +0800 Subject: [PATCH] Improve the design of session discard --- src/emqx_connection.erl | 16 ++++--- src/emqx_protocol.erl | 31 ++++++------- src/emqx_session.erl | 92 ++++++++++++++++++++------------------ src/emqx_ws_connection.erl | 9 +++- 4 files changed, 78 insertions(+), 70 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 16d35585e..bcaea297d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -217,6 +217,10 @@ handle_info(timeout, State) -> handle_info({shutdown, Error}, State) -> shutdown(Error, State); +handle_info({shutdown, discard, {ClientId, ByPid}}, State) -> + ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), + shutdown(discard, State); + handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), shutdown(conflict, State); @@ -240,10 +244,10 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Sock}) -> +handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), StatFun = fun() -> - case Transport:getstat(Sock, [recv_oct]) of + case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; Error -> Error end @@ -270,11 +274,11 @@ handle_info(Info, State) -> {noreply, State}. terminate(Reason, State = #state{transport = Transport, - socket = Sock, + socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> ?LOG(debug, "Terminated for ~p", [Reason], State), - Transport:fast_close(Sock), + Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; @@ -358,8 +362,8 @@ run_socket(State = #state{conn_state = blocked}) -> State; run_socket(State = #state{await_recv = true}) -> State; -run_socket(State = #state{transport = Transport, socket = Sock}) -> - Transport:async_recv(Sock, 0, infinity), +run_socket(State = #state{transport = Transport, socket = Socket}) -> + Transport:async_recv(Socket, 0, infinity), State#state{await_recv = true}. %%------------------------------------------------------------------------------ diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index da7ee88b8..5ef4a9b0d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -402,11 +402,11 @@ process_packet(?PACKET(?DISCONNECT), PState) -> %%------------------------------------------------------------------------------ connack({?RC_SUCCESS, SP, PState}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> ReasonCode; true -> @@ -648,22 +648,17 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> false -> MsgCnt end}. -shutdown(_Error, #pstate{client_id = undefined}) -> - ignore; -shutdown(conflict, #pstate{client_id = ClientId}) -> - emqx_cm:unregister_connection(ClientId), - ignore; -shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> - emqx_cm:unregister_connection(ClientId), - ignore; -shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> - ?LOG(info, "Shutdown for ~p", [Error], PState), - %% TODO: Auth failure not publish the will message - case Error =:= auth_failure of - true -> ok; - false -> send_willmsg(WillMsg) - end, - emqx_hooks:run('client.disconnected', [credentials(PState), Error]), +shutdown(_Reason, #pstate{client_id = undefined}) -> + ok; +shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; + Reason =:= discard -> + emqx_cm:unregister_connection(ClientId); +shutdown(Reason, PState = #pstate{client_id = ClientId, + will_msg = WillMsg, + connected = true}) -> + ?LOG(info, "Shutdown for ~p", [Reason], PState), + _ = send_willmsg(WillMsg), + emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). send_willmsg(undefined) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 65bce85aa..d5b68a1f6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -147,6 +147,8 @@ created_at :: erlang:timestamp() }). +-type(spid() :: pid()). + -define(TIMEOUT, 60000). -define(LOG(Level, Format, Args, State), @@ -159,7 +161,7 @@ start_link(SessAttrs) -> proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]). %% @doc Get session info --spec(info(pid() | #state{}) -> list({atom(), term()})). +-spec(info(spid() | #state{}) -> list({atom(), term()})). info(SPid) when is_pid(SPid) -> gen_server:call(SPid, info, infinity); @@ -187,7 +189,7 @@ info(State = #state{conn_pid = ConnPid, {await_rel_timeout, AwaitRelTimeout}]. %% @doc Get session attrs --spec(attrs(pid() | #state{}) -> list({atom(), term()})). +-spec(attrs(spid() | #state{}) -> list({atom(), term()})). attrs(SPid) when is_pid(SPid) -> gen_server:call(SPid, attrs, infinity); @@ -204,7 +206,7 @@ attrs(#state{clean_start = CleanStart, {expiry_interval, ExpiryInterval div 1000}, {created_at, CreatedAt}]. --spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). +-spec(stats(spid() | #state{}) -> list({atom(), non_neg_integer()})). stats(SPid) when is_pid(SPid) -> gen_server:call(SPid, stats, infinity); @@ -233,19 +235,19 @@ stats(#state{max_subscriptions = MaxSubscriptions, %% PubSub API %%------------------------------------------------------------------------------ --spec(subscribe(pid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). +-spec(subscribe(spid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) || {RawTopic, SubOpts} <- RawTopicFilters], subscribe(SPid, undefined, #{}, TopicFilters). --spec(subscribe(pid(), emqx_mqtt_types:packet_id(), +-spec(subscribe(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). subscribe(SPid, PacketId, Properties, TopicFilters) -> SubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {subscribe, self(), SubReq}). --spec(publish(pid(), emqx_mqtt_types:packet_id(), emqx_types:message()) +-spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message()) -> {ok, emqx_types:deliver_results()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message to broker directly @@ -259,56 +261,56 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 message to session gen_server:call(SPid, {publish, PacketId, Msg}, infinity). --spec(puback(pid(), emqx_mqtt_types:packet_id()) -> ok). +-spec(puback(spid(), emqx_mqtt_types:packet_id()) -> ok). puback(SPid, PacketId) -> gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}). puback(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {puback, PacketId, ReasonCode}). --spec(pubrec(pid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). +-spec(pubrec(spid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrec(SPid, PacketId) -> pubrec(SPid, PacketId, ?RC_SUCCESS). --spec(pubrec(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) +-spec(pubrec(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrec(SPid, PacketId, ReasonCode) -> gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). --spec(pubrel(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) +-spec(pubrel(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrel(SPid, PacketId, ReasonCode) -> gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). --spec(pubcomp(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). +-spec(pubcomp(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). pubcomp(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). --spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). +-spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - TopicFilters = lists:map(fun({RawTopic, Opts}) -> - emqx_topic:parse(RawTopic, Opts); - (RawTopic) -> - emqx_topic:parse(RawTopic) - end, RawTopicFilters), + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) when is_binary(RawTopic) -> + emqx_topic:parse(RawTopic) + end, RawTopicFilters), unsubscribe(SPid, undefined, #{}, TopicFilters). --spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), +-spec(unsubscribe(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). --spec(resume(pid(), pid()) -> ok). +-spec(resume(spid(), pid()) -> ok). resume(SPid, ConnPid) -> gen_server:cast(SPid, {resume, ConnPid}). %% @doc Discard the session --spec(discard(pid(), emqx_types:client_id()) -> ok). -discard(SPid, ClientId) -> - gen_server:call(SPid, {discard, ClientId}, infinity). +-spec(discard(spid(), ByPid :: pid()) -> ok). +discard(SPid, ByPid) -> + gen_server:call(SPid, {discard, ByPid}, infinity). --spec(close(pid()) -> ok). +-spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -367,13 +369,23 @@ init_mqueue(Zone) -> binding(ConnPid) -> case node(ConnPid) =:= node() of true -> local; false -> remote end. -handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> - ?LOG(warning, "Discarded by ~p", [ConnPid], State), +handle_call(info, _From, State) -> + reply(info(State), State); + +handle_call(attrs, _From, State) -> + reply(attrs(State), State); + +handle_call(stats, _From, State) -> + reply(stats(State), State); + +handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> + ?LOG(warning, "Discarded by ~p", [ByPid], State), {stop, {shutdown, discard}, ok, State}; -handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> - ?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State), - {stop, {shutdown, conflict}, ok, State}; +handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> + ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State), + ConnPid ! {shutdown, discard, {ClientId, ByPid}}, + {stop, {shutdown, discard}, ok, State}; %% PUBLISH: handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From, @@ -415,15 +427,6 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); -handle_call(info, _From, State) -> - reply(info(State), State); - -handle_call(attrs, _From, State) -> - reply(attrs(State), State); - -handle_call(stats, _From, State) -> - reply(stats(State), State); - handle_call(close, _From, State) -> {stop, normal, ok, State}; @@ -441,6 +444,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, SubMap; {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + %% Why??? emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), maps:put(Topic, SubOpts, SubMap); error -> @@ -617,17 +621,17 @@ unsuback(From, PacketId, ReasonCodes) -> From ! {deliver, {unsuback, PacketId, ReasonCodes}}. %%------------------------------------------------------------------------------ -%% Kickout old client +%% Kickout old connection -kick(_ClientId, undefined, _Pid) -> +kick(_ClientId, undefined, _ConnPid) -> ignore; -kick(_ClientId, Pid, Pid) -> +kick(_ClientId, ConnPid, ConnPid) -> ignore; -kick(ClientId, OldPid, Pid) -> - unlink(OldPid), - OldPid ! {shutdown, conflict, {ClientId, Pid}}, +kick(ClientId, OldConnPid, ConnPid) -> + unlink(OldConnPid), + OldConnPid ! {shutdown, conflict, {ClientId, ConnPid}}, %% Clean noproc - receive {'EXIT', OldPid, _} -> ok after 1 -> ok end. + receive {'EXIT', OldConnPid, _} -> ok after 1 -> ok end. %%------------------------------------------------------------------------------ %% Replay or Retry Delivery diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ed1532565..74014707a 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -34,20 +34,21 @@ options, peername, sockname, + idle_timeout, proto_state, parser_state, keepalive, enable_stats, stats_timer, - idle_timeout, shutdown_reason }). -define(INFO_KEYS, [peername, sockname]). + -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). + emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). %%------------------------------------------------------------------------------ %% API @@ -235,6 +236,10 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> shutdown(keepalive_error, State) end; +websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> + ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), + shutdown(discard, State); + websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), shutdown(conflict, State);