From b92231047cf0a191105c3e5fdbdd4ebca5a3a086 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 10 Aug 2016 14:02:13 +0800 Subject: [PATCH] subscriber_down/1 --- src/emqttd_pubsub.erl | 22 +++++++++++++++++----- src/emqttd_session.erl | 2 ++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 71a9197d2..6107704c3 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -37,6 +37,8 @@ -export([async_subscribe/1, async_subscribe/2, async_subscribe/3, async_unsubscribe/1, async_unsubscribe/2]). +-export([subscriber_down/1]). + %% Management API. -export([setqos/3, is_subscribed/2, subscriptions/1]). @@ -89,6 +91,9 @@ async_subscribe(Topic, Subscriber) when is_binary(Topic) -> async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). +subscriber_down(Subscriber) -> + cast(pick(Subscriber), {down, Subscriber}). + %% @doc Publish message to Topic. -spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). publish(Topic, Msg) when is_binary(Topic) -> @@ -207,14 +212,15 @@ handle_cast({unsubscribe, Topic, Subscriber}, State) -> {error, _Error} -> {noreply, State} end; +handle_cast({down, Subscriber}, State) -> + subscriber_down_(Subscriber), + {noreply, State}; + handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) -> - lists:foreach(fun({_, Topic}) -> - subscriber_down(DownPid, Topic) - end, ets:lookup(subscription, DownPid)), - ets:delete(subscription, DownPid), + subscriber_down_(DownPid), {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate}; handle_info(Info, State) -> @@ -261,7 +267,13 @@ do_unsubscribe(Topic, Subscriber, State) -> del_subscription(Subscriber, Topic) -> ets:delete_object(subscription, {Subscriber, Topic}). -subscriber_down(DownPid, Topic) -> +subscriber_down_(Subscriber) -> + lists:foreach(fun({_, Topic}) -> + subscriber_down_(Subscriber, Topic) + end, ets:lookup(subscription, Subscriber)), + ets:delete(subscription, Subscriber). + +subscriber_down_(DownPid, Topic) -> case ets:lookup(subproperty, {Topic, DownPid}) of [] -> %% here? diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 8c2bbf8bc..63feb4617 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -532,6 +532,8 @@ handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> + %%TODO: ... + emqttd_pubsub:subscriber_down(ClientId), emqttd_sm:unregister_session(CleanSess, ClientId). code_change(_OldVsn, Session, _Extra) ->