From 6ceb1c6718b3c3515d461ee33cd8ef75a0dd0567 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 9 Sep 2016 16:34:29 +0800 Subject: [PATCH] Fix subscribe, unsubscribe --- src/emqttd_client.erl | 16 ++++++---------- src/emqttd_mod_subscription.erl | 1 - src/emqttd_protocol.erl | 23 +++++++++++++++++++++-- src/emqttd_ws_client.erl | 15 ++++++--------- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 7b7dedcf9..12bd2942a 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -140,14 +140,14 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, TopicTable}, State) -> - with_session(fun(SessPid) -> - emqttd_session:subscribe(SessPid, TopicTable) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({subscribe, TopicTable}, ProtoState) + end, State); handle_cast({unsubscribe, Topics}, State) -> - with_session(fun(SessPid) -> - emqttd_session:unsubscribe(SessPid, Topics) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({unsubscribe, Topics}, ProtoState) + end, State); handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). @@ -249,10 +249,6 @@ with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), hibernate(State#client_state{proto_state = ProtoState1}). -with_session(Fun, State = #client_state{proto_state = ProtoState}) -> - Fun(emqttd_protocol:session(ProtoState)), - hibernate(State). - %% Receive and parse tcp data received(<<>>, State) -> hibernate(State); diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index a545dcfaf..a0d030391 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Subscription from Broker Side -module(emqttd_mod_subscription). -behaviour(emqttd_gen_mod). diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index d8bc05034..81593a7e5 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Protocol Processor. -module(emqttd_protocol). -include("emqttd.hrl"). @@ -28,7 +27,7 @@ %% API -export([init/3, info/1, clientid/1, client/1, session/1]). --export([received/2, send/2, redeliver/2, shutdown/2]). +-export([received/2, handle/2, send/2, redeliver/2, shutdown/2]). -export([process/2]). @@ -116,6 +115,26 @@ received(Packet = ?PACKET(_Type), State) -> {error, Reason, State} end. +handle({subscribe, RawTopicTable}, ProtoState = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + TopicTable = parse_topic_table(RawTopicTable), + case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of + {ok, TopicTable1} -> + emqttd_session:subscribe(Session, TopicTable1); + {stop, _} -> + ok + end, + {ok, ProtoState}; + +handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + {ok, TopicTable} = emqttd:run_hooks('client.unsubscribe', + [ClientId, Username], parse_topics(RawTopics)), + emqttd_session:unsubscribe(Session, TopicTable), + {ok, ProtoState}. + process(Packet = ?CONNECT_PACKET(Var), State0) -> #mqtt_packet_connect{proto_ver = ProtoVer, diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index d8f30d309..3ea9cdbdd 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -97,14 +97,14 @@ handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> - with_session(fun(SessPid) -> - emqttd_session:subscribe(SessPid, TopicTable) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({subscribe, TopicTable}, ProtoState) + end, State); handle_cast({unsubscribe, Topics}, State) -> - with_session(fun(SessPid) -> - emqttd_session:unsubscribe(SessPid, Topics) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({unsubscribe, Topics}, ProtoState) + end, State); handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of @@ -194,9 +194,6 @@ with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), noreply(State#wsclient_state{proto_state = ProtoState1}). -with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) -> - Fun(emqttd_protocol:session(ProtoState)), noreply(State). - noreply(State) -> {noreply, State, hibernate}.