This commit is contained in:
Feng 2015-06-11 13:57:00 +08:00
parent f857f1ec19
commit 4313ed0cf3
1 changed files with 31 additions and 24 deletions

View File

@ -69,15 +69,21 @@
%% Inflight window size
inflight_window = 40,
%% Inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
%% Inflight qos1, qos2 messages sent to the client but unacked,
%% QoS 1 and QoS 2 messages which have been sent to the Client,
%% but have not been completely acknowledged.
%% Client <- Broker
inflight_queue :: list(),
%% Inflight qos2 messages received from client and waiting for pubrel. QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
%% Inflight qos2 messages received from client and waiting for pubrel.
%% QoS 2 messages which have been received from the Client,
%% but have not been completely acknowledged.
%% Client -> Broker
awaiting_queue :: list(),
%% All qos1, qos2 messages published to when client is disconnected. QoS 1 and QoS 2 messages pending transmission to the Client.
%% All qos1, qos2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client.
%%
%% Optionally, QoS 0 messages pending transmission to the Client.
pending_queue :: emqttd_mqueue:mqueue(),
@ -209,33 +215,34 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
%% @end
%%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
subscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
[ClientId, Topics, GrantedQos]),
[ClientId, Topics, GrantedQos]),
Subscriptions1 =
lists:foldl(fun({Topic, Qos}, Acc) ->
case lists:keyfind(Topic, 1, Acc) of
{Topic, Qos} ->
lager:warning([{client, ClientId}], "~s resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc;
{Topic, Old} ->
lager:warning([{client, ClientId}], "~s resubscribe ~p: old qos=~p, new qos=~p",
[ClientId, Topic, Old, Qos]),
lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
false ->
%%TODO: the design is ugly, rewrite later...:(
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
emqttd_msg_store:redeliver(Topic, self()),
[{Topic, Qos} | Acc]
end
end, Subscriptions, Topics),
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
lists:foreach(fun({Name, _Qos}) ->
case maps:is_key(Name, SubMap) of
true ->
lager:warning("~s resubscribe ~p", [ClientId, Name]);
false ->
%%TODO: this is not right, rewrite later...
emqttd_msg_store:redeliver(Name, self())
end
end, Topics),
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) ->
maps:put(Name, Qos, Acc)
end, SubMap, Topics),
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
{ok, SessState#session_state{subscriptions = Subscriptions1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
@ -246,7 +253,7 @@ subscribe(SessPid, Topics) when is_pid(SessPid) ->
%% @end
%%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}.
unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
unsubscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%%TODO: refactor later.
case Topics -- maps:keys(SubMap) of
[] -> ok;