From 0e97dfff296916c0101792709f03d9bd051712b9 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 6 Jan 2020 18:46:29 +0800 Subject: [PATCH] Add baidian business --- src/emqx_session.erl | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2b719f699..b3e51546a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -404,10 +404,19 @@ handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> ?LOG(notice, "Discarded by ~p", [ByPid]), {stop, {shutdown, discarded}, ok, State}; -handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> +handle_call({discard, ByPid}, _From, State = #state{conn_pid = ConnPid, client_id = ClientId}) -> ?LOG(notice, "Conn ~p is discarded by ~p", [ConnPid, ByPid]), - ConnPid ! {shutdown, discard, {ClientId, ByPid}}, - {stop, {shutdown, discarded}, ok, State}; + case ClientId of + <<"d:", _Sn/binary>> -> + ConnPid ! {shutdown, discard, {ClientId, ByPid}}, + {stop, {shutdown, discarded}, ok, State}; + _ -> + Topic = <<"$SYS/kickout">>, + Msg = emqx_message:make(broker, 1, Topic, <<"The client has been kicked out">>), + {_, State1} = handle_dispatch([{Topic, Msg}], State), + erlang:send_after(5000, self(), {kicked, ByPid}), + {reply, ok, State1} + end; %% PUBLISH: This is only to register packetId to session state. %% The actual message dispatching should be done by the caller (e.g. connection) process. @@ -498,16 +507,23 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1})); %% PUBACK: -handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> - noreply( - case emqx_inflight:contain(PacketId, Inflight) of +handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight, + client_id = ClientId, + conn_pid = ConnPid}) -> + case emqx_inflight:contain(PacketId, Inflight) of true -> - ensure_stats_timer(dequeue(acked(puback, PacketId, State))); + case emqx_inflight:lookup(PacketId, Inflight) of + {value, {publish, {_, #message{topic = <<"$SYS/kickout">>}}, _Ts}} -> + ConnPid ! {shutdown, discard, {ClientId, undefined}}, + {stop, {shutdown, discarded}, State}; + _ -> + {noreply, ensure_stats_timer(dequeue(acked(puback, PacketId, State)))} + end; false -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), - State - end); + {noreply, State} + end; %% PUBCOMP: handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> @@ -654,6 +670,10 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> handle_info({shutdown, Reason}, State) -> shutdown(Reason, State); +handle_info({kicked, ByPid}, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> + ConnPid ! {shutdown, discard, {ClientId, ByPid}}, + {stop, {shutdown, discarded}, State}; + handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}.