subscriber_down/1

This commit is contained in:
Feng Lee 2016-08-10 14:02:13 +08:00
parent bf8730d12d
commit b92231047c
2 changed files with 19 additions and 5 deletions

View File

@ -37,6 +37,8 @@
-export([async_subscribe/1, async_subscribe/2, async_subscribe/3,
async_unsubscribe/1, async_unsubscribe/2]).
-export([subscriber_down/1]).
%% Management API.
-export([setqos/3, is_subscribed/2, subscriptions/1]).
@ -89,6 +91,9 @@ async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
subscriber_down(Subscriber) ->
cast(pick(Subscriber), {down, Subscriber}).
%% @doc Publish message to Topic.
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore).
publish(Topic, Msg) when is_binary(Topic) ->
@ -207,14 +212,15 @@ handle_cast({unsubscribe, Topic, Subscriber}, State) ->
{error, _Error} -> {noreply, State}
end;
handle_cast({down, Subscriber}, State) ->
subscriber_down_(Subscriber),
{noreply, State};
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
lists:foreach(fun({_, Topic}) ->
subscriber_down(DownPid, Topic)
end, ets:lookup(subscription, DownPid)),
ets:delete(subscription, DownPid),
subscriber_down_(DownPid),
{noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
handle_info(Info, State) ->
@ -261,7 +267,13 @@ do_unsubscribe(Topic, Subscriber, State) ->
del_subscription(Subscriber, Topic) ->
ets:delete_object(subscription, {Subscriber, Topic}).
subscriber_down(DownPid, Topic) ->
subscriber_down_(Subscriber) ->
lists:foreach(fun({_, Topic}) ->
subscriber_down_(Subscriber, Topic)
end, ets:lookup(subscription, Subscriber)),
ets:delete(subscription, Subscriber).
subscriber_down_(DownPid, Topic) ->
case ets:lookup(subproperty, {Topic, DownPid}) of
[] ->
%% here?

View File

@ -532,6 +532,8 @@ handle_info(Info, Session) ->
?UNEXPECTED_INFO(Info, Session).
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
%%TODO: ...
emqttd_pubsub:subscriber_down(ClientId),
emqttd_sm:unregister_session(CleanSess, ClientId).
code_change(_OldVsn, Session, _Extra) ->