Improve the design of session discard

This commit is contained in:
Feng Lee 2018-08-30 23:14:09 +08:00
parent 78a8ccd0f2
commit 0379219a04
4 changed files with 78 additions and 70 deletions

View File

@ -217,6 +217,10 @@ handle_info(timeout, State) ->
handle_info({shutdown, Error}, State) -> handle_info({shutdown, Error}, State) ->
shutdown(Error, State); shutdown(Error, State);
handle_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
shutdown(discard, State);
handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
shutdown(conflict, State); shutdown(conflict, State);
@ -240,10 +244,10 @@ handle_info({inet_reply, _Sock, ok}, State) ->
handle_info({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State); shutdown(Reason, State);
handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Sock}) -> handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) ->
?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
StatFun = fun() -> StatFun = fun() ->
case Transport:getstat(Sock, [recv_oct]) of case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
Error -> Error Error -> Error
end end
@ -270,11 +274,11 @@ handle_info(Info, State) ->
{noreply, State}. {noreply, State}.
terminate(Reason, State = #state{transport = Transport, terminate(Reason, State = #state{transport = Transport,
socket = Sock, socket = Socket,
keepalive = KeepAlive, keepalive = KeepAlive,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
?LOG(debug, "Terminated for ~p", [Reason], State), ?LOG(debug, "Terminated for ~p", [Reason], State),
Transport:fast_close(Sock), Transport:fast_close(Socket),
emqx_keepalive:cancel(KeepAlive), emqx_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of case {ProtoState, Reason} of
{undefined, _} -> ok; {undefined, _} -> ok;
@ -358,8 +362,8 @@ run_socket(State = #state{conn_state = blocked}) ->
State; State;
run_socket(State = #state{await_recv = true}) -> run_socket(State = #state{await_recv = true}) ->
State; State;
run_socket(State = #state{transport = Transport, socket = Sock}) -> run_socket(State = #state{transport = Transport, socket = Socket}) ->
Transport:async_recv(Sock, 0, infinity), Transport:async_recv(Socket, 0, infinity),
State#state{await_recv = true}. State#state{await_recv = true}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -402,11 +402,11 @@ process_packet(?PACKET(?DISCONNECT), PState) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) -> connack({?RC_SUCCESS, SP, PState}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
_ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 ->
ReasonCode; ReasonCode;
true -> true ->
@ -648,22 +648,17 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
false -> MsgCnt false -> MsgCnt
end}. end}.
shutdown(_Error, #pstate{client_id = undefined}) -> shutdown(_Reason, #pstate{client_id = undefined}) ->
ignore; ok;
shutdown(conflict, #pstate{client_id = ClientId}) -> shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
emqx_cm:unregister_connection(ClientId), Reason =:= discard ->
ignore; emqx_cm:unregister_connection(ClientId);
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> shutdown(Reason, PState = #pstate{client_id = ClientId,
emqx_cm:unregister_connection(ClientId), will_msg = WillMsg,
ignore; connected = true}) ->
shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Reason], PState),
?LOG(info, "Shutdown for ~p", [Error], PState), _ = send_willmsg(WillMsg),
%% TODO: Auth failure not publish the will message emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
case Error =:= auth_failure of
true -> ok;
false -> send_willmsg(WillMsg)
end,
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
emqx_cm:unregister_connection(ClientId). emqx_cm:unregister_connection(ClientId).
send_willmsg(undefined) -> send_willmsg(undefined) ->

View File

@ -147,6 +147,8 @@
created_at :: erlang:timestamp() created_at :: erlang:timestamp()
}). }).
-type(spid() :: pid()).
-define(TIMEOUT, 60000). -define(TIMEOUT, 60000).
-define(LOG(Level, Format, Args, State), -define(LOG(Level, Format, Args, State),
@ -159,7 +161,7 @@ start_link(SessAttrs) ->
proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]). proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]).
%% @doc Get session info %% @doc Get session info
-spec(info(pid() | #state{}) -> list({atom(), term()})). -spec(info(spid() | #state{}) -> list({atom(), term()})).
info(SPid) when is_pid(SPid) -> info(SPid) when is_pid(SPid) ->
gen_server:call(SPid, info, infinity); gen_server:call(SPid, info, infinity);
@ -187,7 +189,7 @@ info(State = #state{conn_pid = ConnPid,
{await_rel_timeout, AwaitRelTimeout}]. {await_rel_timeout, AwaitRelTimeout}].
%% @doc Get session attrs %% @doc Get session attrs
-spec(attrs(pid() | #state{}) -> list({atom(), term()})). -spec(attrs(spid() | #state{}) -> list({atom(), term()})).
attrs(SPid) when is_pid(SPid) -> attrs(SPid) when is_pid(SPid) ->
gen_server:call(SPid, attrs, infinity); gen_server:call(SPid, attrs, infinity);
@ -204,7 +206,7 @@ attrs(#state{clean_start = CleanStart,
{expiry_interval, ExpiryInterval div 1000}, {expiry_interval, ExpiryInterval div 1000},
{created_at, CreatedAt}]. {created_at, CreatedAt}].
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). -spec(stats(spid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SPid) when is_pid(SPid) -> stats(SPid) when is_pid(SPid) ->
gen_server:call(SPid, stats, infinity); gen_server:call(SPid, stats, infinity);
@ -233,19 +235,19 @@ stats(#state{max_subscriptions = MaxSubscriptions,
%% PubSub API %% PubSub API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(subscribe(pid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). -spec(subscribe(spid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok).
subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts))
|| {RawTopic, SubOpts} <- RawTopicFilters], || {RawTopic, SubOpts} <- RawTopicFilters],
subscribe(SPid, undefined, #{}, TopicFilters). subscribe(SPid, undefined, #{}, TopicFilters).
-spec(subscribe(pid(), emqx_mqtt_types:packet_id(), -spec(subscribe(spid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
subscribe(SPid, PacketId, Properties, TopicFilters) -> subscribe(SPid, PacketId, Properties, TopicFilters) ->
SubReq = {PacketId, Properties, TopicFilters}, SubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {subscribe, self(), SubReq}). gen_server:cast(SPid, {subscribe, self(), SubReq}).
-spec(publish(pid(), emqx_mqtt_types:packet_id(), emqx_types:message()) -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message())
-> {ok, emqx_types:deliver_results()}). -> {ok, emqx_types:deliver_results()}).
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 message to broker directly %% Publish QoS0 message to broker directly
@ -259,56 +261,56 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 message to session %% Publish QoS2 message to session
gen_server:call(SPid, {publish, PacketId, Msg}, infinity). gen_server:call(SPid, {publish, PacketId, Msg}, infinity).
-spec(puback(pid(), emqx_mqtt_types:packet_id()) -> ok). -spec(puback(spid(), emqx_mqtt_types:packet_id()) -> ok).
puback(SPid, PacketId) -> puback(SPid, PacketId) ->
gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}). gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}).
puback(SPid, PacketId, ReasonCode) -> puback(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {puback, PacketId, ReasonCode}). gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
-spec(pubrec(pid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). -spec(pubrec(spid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrec(SPid, PacketId) -> pubrec(SPid, PacketId) ->
pubrec(SPid, PacketId, ?RC_SUCCESS). pubrec(SPid, PacketId, ?RC_SUCCESS).
-spec(pubrec(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -spec(pubrec(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code())
-> ok | {error, emqx_mqtt_types:reason_code()}). -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrec(SPid, PacketId, ReasonCode) -> pubrec(SPid, PacketId, ReasonCode) ->
gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity).
-spec(pubrel(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -spec(pubrel(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code())
-> ok | {error, emqx_mqtt_types:reason_code()}). -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrel(SPid, PacketId, ReasonCode) -> pubrel(SPid, PacketId, ReasonCode) ->
gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity).
-spec(pubcomp(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). -spec(pubcomp(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok).
pubcomp(SPid, PacketId, ReasonCode) -> pubcomp(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}).
-spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). -spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = lists:map(fun({RawTopic, Opts}) -> TopicFilters = lists:map(fun({RawTopic, Opts}) ->
emqx_topic:parse(RawTopic, Opts); emqx_topic:parse(RawTopic, Opts);
(RawTopic) -> (RawTopic) when is_binary(RawTopic) ->
emqx_topic:parse(RawTopic) emqx_topic:parse(RawTopic)
end, RawTopicFilters), end, RawTopicFilters),
unsubscribe(SPid, undefined, #{}, TopicFilters). unsubscribe(SPid, undefined, #{}, TopicFilters).
-spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), -spec(unsubscribe(spid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
unsubscribe(SPid, PacketId, Properties, TopicFilters) -> unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
UnsubReq = {PacketId, Properties, TopicFilters}, UnsubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
-spec(resume(pid(), pid()) -> ok). -spec(resume(spid(), pid()) -> ok).
resume(SPid, ConnPid) -> resume(SPid, ConnPid) ->
gen_server:cast(SPid, {resume, ConnPid}). gen_server:cast(SPid, {resume, ConnPid}).
%% @doc Discard the session %% @doc Discard the session
-spec(discard(pid(), emqx_types:client_id()) -> ok). -spec(discard(spid(), ByPid :: pid()) -> ok).
discard(SPid, ClientId) -> discard(SPid, ByPid) ->
gen_server:call(SPid, {discard, ClientId}, infinity). gen_server:call(SPid, {discard, ByPid}, infinity).
-spec(close(pid()) -> ok). -spec(close(spid()) -> ok).
close(SPid) -> close(SPid) ->
gen_server:call(SPid, close, infinity). gen_server:call(SPid, close, infinity).
@ -367,13 +369,23 @@ init_mqueue(Zone) ->
binding(ConnPid) -> binding(ConnPid) ->
case node(ConnPid) =:= node() of true -> local; false -> remote end. case node(ConnPid) =:= node() of true -> local; false -> remote end.
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> handle_call(info, _From, State) ->
?LOG(warning, "Discarded by ~p", [ConnPid], State), reply(info(State), State);
handle_call(attrs, _From, State) ->
reply(attrs(State), State);
handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ByPid], State),
{stop, {shutdown, discard}, ok, State}; {stop, {shutdown, discard}, ok, State};
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State), ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State),
{stop, {shutdown, conflict}, ok, State}; ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discard}, ok, State};
%% PUBLISH: %% PUBLISH:
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From, handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From,
@ -415,15 +427,6 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end); end);
handle_call(info, _From, State) ->
reply(info(State), State);
handle_call(attrs, _From, State) ->
reply(attrs(State), State);
handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call(close, _From, State) -> handle_call(close, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
@ -441,6 +444,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
SubMap; SubMap;
{ok, _SubOpts} -> {ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap); maps:put(Topic, SubOpts, SubMap);
error -> error ->
@ -617,17 +621,17 @@ unsuback(From, PacketId, ReasonCodes) ->
From ! {deliver, {unsuback, PacketId, ReasonCodes}}. From ! {deliver, {unsuback, PacketId, ReasonCodes}}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Kickout old client %% Kickout old connection
kick(_ClientId, undefined, _Pid) -> kick(_ClientId, undefined, _ConnPid) ->
ignore; ignore;
kick(_ClientId, Pid, Pid) -> kick(_ClientId, ConnPid, ConnPid) ->
ignore; ignore;
kick(ClientId, OldPid, Pid) -> kick(ClientId, OldConnPid, ConnPid) ->
unlink(OldPid), unlink(OldConnPid),
OldPid ! {shutdown, conflict, {ClientId, Pid}}, OldConnPid ! {shutdown, conflict, {ClientId, ConnPid}},
%% Clean noproc %% Clean noproc
receive {'EXIT', OldPid, _} -> ok after 1 -> ok end. receive {'EXIT', OldConnPid, _} -> ok after 1 -> ok end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Replay or Retry Delivery %% Replay or Retry Delivery

View File

@ -34,20 +34,21 @@
options, options,
peername, peername,
sockname, sockname,
idle_timeout,
proto_state, proto_state,
parser_state, parser_state,
keepalive, keepalive,
enable_stats, enable_stats,
stats_timer, stats_timer,
idle_timeout,
shutdown_reason shutdown_reason
}). }).
-define(INFO_KEYS, [peername, sockname]). -define(INFO_KEYS, [peername, sockname]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(WSLOG(Level, Format, Args, State), -define(WSLOG(Level, Format, Args, State),
emqx_logger:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
@ -235,6 +236,10 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
shutdown(keepalive_error, State) shutdown(keepalive_error, State)
end; end;
websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
shutdown(discard, State);
websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
shutdown(conflict, State); shutdown(conflict, State);