From 949b70f277015f2e1bed45e86f50b20d3623b577 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 6 Jul 2015 14:07:24 +0800 Subject: [PATCH] subscribe, unsubscribe hooks --- src/emqttd_protocol.erl | 9 +++------ src/emqttd_session.erl | 16 ++++++++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 28ae04d56..3aa4c6292 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -207,9 +207,8 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> - TopicTable1 = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable), %%TODO: GrantedQos should be renamed. - {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), + {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), send(?SUBACK_PACKET(PacketId, GrantedQos), State) end; @@ -221,10 +220,8 @@ handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{client_id = ClientId, - session = Session}) -> - Topics1 = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics), - ok = emqttd_session:unsubscribe(Session, Topics1), +handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> + ok = emqttd_session:unsubscribe(Session, Topics), send(?UNSUBACK_PACKET(PacketId), State); handle(?PACKET(?PINGREQ), State) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 2971e3109..6b32b844b 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -234,14 +234,16 @@ init([CleanSess, ClientId, ClientPid]) -> timestamp = os:timestamp()}, {ok, Session, hibernate}. -handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> +handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> + TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), + %% subscribe first and don't care if the subscriptions have been existed - {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), + {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p", - [ClientId, Topics, GrantedQos]), + [ClientId, TopicTable, GrantedQos]), Subscriptions1 = lists:foldl(fun({Topic, Qos}, Acc) -> @@ -261,12 +263,14 @@ handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId, emqttd_retained:dispatch(Topic, self()), [{Topic, Qos} | Acc] end - end, Subscriptions, Topics), + end, Subscriptions, TopicTable), {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; -handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientId, +handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> + Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), + %% unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics),