Add baidian business

This commit is contained in:
turtled 2020-01-06 18:46:29 +08:00
parent d409e25e76
commit 0e97dfff29
1 changed files with 29 additions and 9 deletions

View File

@ -404,10 +404,19 @@ handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(notice, "Discarded by ~p", [ByPid]),
{stop, {shutdown, discarded}, ok, State};
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
handle_call({discard, ByPid}, _From, State = #state{conn_pid = ConnPid, client_id = ClientId}) ->
?LOG(notice, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, ok, State};
case ClientId of
<<"d:", _Sn/binary>> ->
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, ok, State};
_ ->
Topic = <<"$SYS/kickout">>,
Msg = emqx_message:make(broker, 1, Topic, <<"The client has been kicked out">>),
{_, State1} = handle_dispatch([{Topic, Msg}], State),
erlang:send_after(5000, self(), {kicked, ByPid}),
{reply, ok, State1}
end;
%% PUBLISH: This is only to register packetId to session state.
%% The actual message dispatching should be done by the caller (e.g. connection) process.
@ -498,16 +507,23 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
%% PUBACK:
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
noreply(
case emqx_inflight:contain(PacketId, Inflight) of
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight,
client_id = ClientId,
conn_pid = ConnPid}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, #message{topic = <<"$SYS/kickout">>}}, _Ts}} ->
ConnPid ! {shutdown, discard, {ClientId, undefined}},
{stop, {shutdown, discarded}, State};
_ ->
{noreply, ensure_stats_timer(dequeue(acked(puback, PacketId, State)))}
end;
false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'),
State
end);
{noreply, State}
end;
%% PUBCOMP:
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
@ -654,6 +670,10 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
handle_info({shutdown, Reason}, State) ->
shutdown(Reason, State);
handle_info({kicked, ByPid}, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, State};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.