From 4313ed0cf37e98545f851a99112b4204d72a2fa2 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 11 Jun 2015 13:57:00 +0800 Subject: [PATCH] comment --- apps/emqttd/src/emqttd_session.erl | 55 +++++++++++++++++------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 313b8eb3d..8350b89b8 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -69,15 +69,21 @@ %% Inflight window size inflight_window = 40, - %% Inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged. + %% Inflight qos1, qos2 messages sent to the client but unacked, + %% QoS 1 and QoS 2 messages which have been sent to the Client, + %% but have not been completely acknowledged. %% Client <- Broker inflight_queue :: list(), - %% Inflight qos2 messages received from client and waiting for pubrel. QoS 2 messages which have been received from the Client, but have not been completely acknowledged. + %% Inflight qos2 messages received from client and waiting for pubrel. + %% QoS 2 messages which have been received from the Client, + %% but have not been completely acknowledged. %% Client -> Broker awaiting_queue :: list(), - %% All qos1, qos2 messages published to when client is disconnected. QoS 1 and QoS 2 messages pending transmission to the Client. + %% All qos1, qos2 messages published to when client is disconnected. + %% QoS 1 and QoS 2 messages pending transmission to the Client. + %% %% Optionally, QoS 0 messages pending transmission to the Client. pending_queue :: emqttd_mqueue:mqueue(), @@ -209,33 +215,34 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> %% @end %%------------------------------------------------------------------------------ -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. -subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) -> +subscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p", - [ClientId, Topics, GrantedQos]), + [ClientId, Topics, GrantedQos]), + Subscriptions1 = + lists:foldl(fun({Topic, Qos}, Acc) -> + case lists:keyfind(Topic, 1, Acc) of + {Topic, Qos} -> + lager:warning([{client, ClientId}], "~s resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc; + {Topic, Old} -> + lager:warning([{client, ClientId}], "~s resubscribe ~p: old qos=~p, new qos=~p", + [ClientId, Topic, Old, Qos]), + lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); + false -> + %%TODO: the design is ugly, rewrite later...:( + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + emqttd_msg_store:redeliver(Topic, self()), + [{Topic, Qos} | Acc] + end + end, Subscriptions, Topics), - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - lists:foreach(fun({Name, _Qos}) -> - case maps:is_key(Name, SubMap) of - true -> - lager:warning("~s resubscribe ~p", [ClientId, Name]); - false -> - %%TODO: this is not right, rewrite later... - emqttd_msg_store:redeliver(Name, self()) - end - end, Topics), - - SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> - maps:put(Name, Qos, Acc) - end, SubMap, Topics), - - {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; + {ok, SessState#session_state{subscriptions = Subscriptions1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), @@ -246,7 +253,7 @@ subscribe(SessPid, Topics) when is_pid(SessPid) -> %% @end %%------------------------------------------------------------------------------ -spec unsubscribe(session(), [binary()]) -> {ok, session()}. -unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) -> +unsubscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> %%TODO: refactor later. case Topics -- maps:keys(SubMap) of [] -> ok;