diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index eb4722bb8..036b345aa 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -162,18 +162,14 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- add_subscriber_(Topic, Subscriber) -> - case ets:member(mqtt_subscriber, Topic) of - false -> emqttd_router:add_route(Topic, node()); - true -> ok - end, + (not ets:member(mqtt_subscriber, Topic)) + andalso emqttd_router:add_route(Topic), ets:insert(mqtt_subscriber, {Topic, Subscriber}). del_subscriber_(Topic, Subscriber) -> ets:delete_object(mqtt_subscriber, {Topic, Subscriber}), - case ets:member(mqtt_subscriber, Topic) of - false -> emqttd_router:del_route(Topic, node()); - true -> ok - end. + (not ets:member(mqtt_subscriber, Topic)) + andalso emqttd_router:del_route(Topic). setstats(State) -> emqttd_stats:setstats('subscribers/count', 'subscribers/max', diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index 345b181f5..e9b2d0eb5 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -172,13 +172,13 @@ init([Pool, Id, Env]) -> {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> - case subscribe_(Topic, Subscriber, Options, State) of + case do_subscribe_(Topic, Subscriber, Options, State) of {ok, NewState} -> {reply, ok, setstats(NewState)}; {error, Error} -> {reply, {error, Error}, State} end; handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - case unsubscribe_(Topic, Subscriber, State) of + case do_unsubscribe_(Topic, Subscriber, State) of {ok, NewState} -> {reply, ok, setstats(NewState), hibernate}; {error, Error} -> {reply, {error, Error}, State} end; @@ -198,13 +198,13 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, Topic, Subscriber, Options}, State) -> - case subscribe_(Topic, Subscriber, Options, State) of + case do_subscribe_(Topic, Subscriber, Options, State) of {ok, NewState} -> {noreply, setstats(NewState)}; {error, _Error} -> {noreply, State} end; handle_cast({unsubscribe, Topic, Subscriber}, State) -> - case unsubscribe_(Topic, Subscriber, State) of + case do_unsubscribe_(Topic, Subscriber, State) of {ok, NewState} -> {noreply, setstats(NewState), hibernate}; {error, _Error} -> {noreply, State} end; @@ -233,7 +233,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- -subscribe_(Topic, Subscriber, Options, State) -> +do_subscribe_(Topic, Subscriber, Options, State) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of [] -> emqttd_pubsub:async_subscribe(Topic, Subscriber), @@ -244,7 +244,12 @@ subscribe_(Topic, Subscriber, Options, State) -> {error, {already_subscribed, Topic}} end. -unsubscribe_(Topic, Subscriber, State) -> +monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> + State#state{submon = PMon:monitor(SubPid)}; +monitor_subpid(_SubPid, State) -> + State. + +do_unsubscribe_(Topic, Subscriber, State) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of [_] -> emqttd_pubsub:async_unsubscribe(Topic, Subscriber), @@ -258,11 +263,6 @@ unsubscribe_(Topic, Subscriber, State) -> {error, {subscription_not_found, Topic}} end. -monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> - State#state{submon = PMon:monitor(SubPid)}; -monitor_subpid(_SubPid, State) -> - State. - demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> State#state{submon = PMon:demonitor(SubPid)}; demonitor_subpid(_SubPid, State) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 2035ce3f6..9345071dc 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -284,14 +284,18 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> - +handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> + %% TODO: Ugly... + TopicTable0 = lists:map(fun({T, Q}) -> + {T1, Opts} = emqttd_topic:strip(T), + {T1, [{qos, Q} | Opts]} + end, RawTopicTable), case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of {ok, TopicTable} -> ?LOG(info, "Subscribe ~p", [TopicTable], Session), Subscriptions1 = lists:foldl( - fun({Topic, Qos}, SubDict) -> + fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) -> case dict:find(Topic, SubDict) of {ok, Qos} -> ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), @@ -301,7 +305,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), dict:store(Topic, Qos, SubDict); error -> - emqttd:subscribe(Topic, ClientId, [{qos, Qos}]), + emqttd:subscribe(Topic, ClientId, Opts), %%TODO: the design is ugly... %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, @@ -319,9 +323,11 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = hibernate(Session) end; -handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> - +handle_cast({unsubscribe, RawTopics}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> + Topics0 = lists:map(fun(Topic) -> + {T, _Opts} = emqttd_topic:strip(Topic), T + end, RawTopics), case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of {ok, Topics} -> ?LOG(info, "unsubscribe ~p", [Topics], Session), @@ -329,7 +335,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, fun(Topic, SubDict) -> case dict:find(Topic, SubDict) of {ok, _Qos} -> - emqttd:unsubscribe(ClientId, Topic), + emqttd:unsubscribe(Topic, ClientId), dict:erase(Topic, SubDict); error -> SubDict