subscribe, unsubscribe hooks

This commit is contained in:
Feng 2015-07-06 14:07:24 +08:00
parent 988e61708d
commit 949b70f277
2 changed files with 13 additions and 12 deletions

View File

@ -207,9 +207,8 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id =
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
{ok, State}; {ok, State};
false -> false ->
TopicTable1 = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable),
%%TODO: GrantedQos should be renamed. %%TODO: GrantedQos should be renamed.
{ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
send(?SUBACK_PACKET(PacketId, GrantedQos), State) send(?SUBACK_PACKET(PacketId, GrantedQos), State)
end; end;
@ -221,10 +220,8 @@ handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State); send(?UNSUBACK_PACKET(PacketId), State);
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{client_id = ClientId, handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
session = Session}) -> ok = emqttd_session:unsubscribe(Session, Topics),
Topics1 = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics),
ok = emqttd_session:unsubscribe(Session, Topics1),
send(?UNSUBACK_PACKET(PacketId), State); send(?UNSUBACK_PACKET(PacketId), State);
handle(?PACKET(?PINGREQ), State) -> handle(?PACKET(?PINGREQ), State) ->

View File

@ -234,14 +234,16 @@ init([CleanSess, ClientId, ClientPid]) ->
timestamp = os:timestamp()}, timestamp = os:timestamp()},
{ok, Session, hibernate}. {ok, Session, hibernate}.
handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId, handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p", lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p",
[ClientId, Topics, GrantedQos]), [ClientId, TopicTable, GrantedQos]),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun({Topic, Qos}, Acc) -> lists:foldl(fun({Topic, Qos}, Acc) ->
@ -261,12 +263,14 @@ handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId,
emqttd_retained:dispatch(Topic, self()), emqttd_retained:dispatch(Topic, self()),
[{Topic, Qos} | Acc] [{Topic, Qos} | Acc]
end end
end, Subscriptions, Topics), end, Subscriptions, TopicTable),
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientId, handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
%% unsubscribe from topic tree %% unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics), ok = emqttd_pubsub:unsubscribe(Topics),