From ea70389cf7ea5c3d5b22185d03d1becadb03fff1 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 11 Sep 2015 14:18:30 +0800 Subject: [PATCH] ignore duplicated subscriptions --- src/emqttd_session.erl | 58 ++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 8314874f3..4e01ae541 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -282,36 +282,40 @@ prioritise_info(Msg, _Len, _State) -> handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> - TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), - - %% subscribe first and don't care if the subscriptions have been existed - {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), + case TopicTable0 -- Subscriptions of + [] -> + {reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session}; + _ -> + TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), + %% subscribe first and don't care if the subscriptions have been existed + {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), - emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), + emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), - lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", - [ClientId, TopicTable, GrantedQos]), + lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", + [ClientId, TopicTable, GrantedQos]), - Subscriptions1 = - lists:foldl(fun({Topic, Qos}, Acc) -> - case lists:keyfind(Topic, 1, Acc) of - {Topic, Qos} -> - lager:warning([{client, ClientId}], "Session(~s): " - "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc; - {Topic, OldQos} -> - lager:warning([{client, ClientId}], "Session(~s): " - "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]), - lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); - false -> - %%TODO: the design is ugly, rewrite later...:( - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - emqttd_retained:dispatch(Topic, self()), - [{Topic, Qos} | Acc] - end - end, Subscriptions, TopicTable), - {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; + Subscriptions1 = + lists:foldl(fun({Topic, Qos}, Acc) -> + case lists:keyfind(Topic, 1, Acc) of + {Topic, Qos} -> + lager:warning([{client, ClientId}], "Session(~s): " + "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc; + {Topic, OldQos} -> + lager:warning([{client, ClientId}], "Session(~s): " + "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]), + lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); + false -> + %%TODO: the design is ugly, rewrite later...:( + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + emqttd_retained:dispatch(Topic, self()), + [{Topic, Qos} | Acc] + end + end, Subscriptions, TopicTable), + {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}} + end; handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) ->