This commit is contained in:
Feng 2015-12-07 21:19:12 +08:00
parent df5daaac5b
commit fd6a350d9c
1 changed files with 56 additions and 47 deletions

View File

@ -85,9 +85,8 @@
%% Last packet id of the session %% Last packet id of the session
packet_id = 1, packet_id = 1,
%%TODO: Removed??
%% Clients subscriptions. %% Clients subscriptions.
subscriptions :: list(), subscriptions :: dict:dict(),
%% Inflight qos1, qos2 messages sent to the client but unacked, %% Inflight qos1, qos2 messages sent to the client but unacked,
%% QoS 1 and QoS 2 messages which have been sent to the Client, %% QoS 1 and QoS 2 messages which have been sent to the Client,
@ -246,7 +245,7 @@ init([CleanSess, ClientId, ClientPid]) ->
clean_sess = CleanSess, clean_sess = CleanSess,
client_id = ClientId, client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
subscriptions = [], subscriptions = dict:new(),
inflight_queue = [], inflight_queue = [],
max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0), max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0),
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
@ -288,7 +287,7 @@ prioritise_info(Msg, _Len, _State) ->
expired -> 10; expired -> 10;
{timeout, _, _} -> 5; {timeout, _, _} -> 5;
collect_info -> 2; collect_info -> 2;
{dispatch, _} -> 1; {dispatch, _, _} -> 1;
_ -> 0 _ -> 0
end. end.
@ -316,7 +315,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
case TopicTable -- Subscriptions of case TopicTable -- dict:to_list(Subscriptions) of
[] -> [] ->
AckFun([Qos || {_, Qos} <- TopicTable]), AckFun([Qos || {_, Qos} <- TopicTable]),
hibernate(Session); hibernate(Session);
@ -331,21 +330,22 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session), ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun({Topic, Qos}, Acc) -> lists:foldl(fun({Topic, Qos}, Dict) ->
case lists:keyfind(Topic, 1, Acc) of case dict:find(Topic, Dict) of
{Topic, Qos} -> {ok, Qos} ->
?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session), ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session),
Acc; Dict;
{Topic, OldQos} -> {ok, OldQos} ->
?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session), ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session),
lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); dict:store(Topic, Qos, Dict);
false -> error ->
%%TODO: the design is ugly, rewrite later...:( %%TODO: the design is ugly, rewrite later...:(
%% <MQTT V3.1.1>: 3.8.4 %% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter, %% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent. %% a new Subscription is created and all matching retained messages are sent.
emqttd_retainer:dispatch(Topic, self()), emqttd_retainer:dispatch(Topic, self()),
[{Topic, Qos} | Acc]
dict:store(Topic, Qos, Dict)
end end
end, Subscriptions, TopicTable), end, Subscriptions, TopicTable),
hibernate(Session#session{subscriptions = Subscriptions1}) hibernate(Session#session{subscriptions = Subscriptions1})
@ -362,13 +362,8 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
?LOG(info, "unsubscribe ~p", [Topics], Session), ?LOG(info, "unsubscribe ~p", [Topics], Session),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun(Topic, Acc) -> lists:foldl(fun(Topic, Dict) ->
case lists:keyfind(Topic, 1, Acc) of dict:erase(Topic, Dict)
{Topic, _Qos} ->
lists:keydelete(Topic, 1, Acc);
false ->
Acc
end
end, Subscriptions, Topics), end, Subscriptions, Topics),
hibernate(Session#session{subscriptions = Subscriptions1}); hibernate(Session#session{subscriptions = Subscriptions1});
@ -485,28 +480,10 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp})
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State). ?UNEXPECTED_MSG(Msg, State).
%% Queue messages when client is offline %% Dispatch Message
handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
message_queue = Q})
when is_record(Msg, mqtt_message) -> when is_record(Msg, mqtt_message) ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); dispatch(fixqos(Topic, Msg, Subscriptions), Session);
%% Dispatch qos0 message directly to client
handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg},
hibernate(Session);
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
Session = #session{message_queue = MsgQ})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case check_inflight(Session) of
true ->
noreply(deliver(Msg, Session));
false ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end;
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
awaiting_ack = AwaitingAck}) -> awaiting_ack = AwaitingAck}) ->
@ -604,6 +581,38 @@ kick(ClientId, OldPid, Pid) ->
%% Clean noproc %% Clean noproc
receive {'EXIT', OldPid, _} -> ok after 0 -> ok end. receive {'EXIT', OldPid, _} -> ok after 0 -> ok end.
%%------------------------------------------------------------------------------
%% Dispatch Messages
%%------------------------------------------------------------------------------
%% Queue message if client disconnected
dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
%% Deliver qos0 message directly to client
dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg},
hibernate(Session);
dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case check_inflight(Session) of
true ->
noreply(deliver(Msg, Session));
false ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end.
fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
case dict:find(Topic, Subscriptions) of
{ok, SubQos} when PubQos > SubQos ->
Msg#mqtt_message{qos = SubQos};
{ok, _SubQos} ->
Msg;
error ->
Msg
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Check inflight and awaiting_rel %% Check inflight and awaiting_rel
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -723,7 +732,7 @@ sess_info(#session{clean_sess = CleanSess,
timestamp = CreatedAt}) -> timestamp = CreatedAt}) ->
Stats = emqttd_mqueue:stats(MessageQueue), Stats = emqttd_mqueue:stats(MessageQueue),
[{clean_sess, CleanSess}, [{clean_sess, CleanSess},
{subscriptions, Subscriptions}, {subscriptions, dict:to_list(Subscriptions)},
{max_inflight, MaxInflight}, {max_inflight, MaxInflight},
{inflight_queue, length(InflightQueue)}, {inflight_queue, length(InflightQueue)},
{message_queue, proplists:get_value(len, Stats)}, {message_queue, proplists:get_value(len, Stats)},