From fd6a350d9cbd2f11a26b632243b67c4a891dd5cc Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 7 Dec 2015 21:19:12 +0800 Subject: [PATCH] dispatch --- src/emqttd_session.erl | 103 ++++++++++++++++++++++------------------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 07b9b0359..a2289920d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -85,9 +85,8 @@ %% Last packet id of the session packet_id = 1, - %%TODO: Removed?? %% Client’s subscriptions. - subscriptions :: list(), + subscriptions :: dict:dict(), %% Inflight qos1, qos2 messages sent to the client but unacked, %% QoS 1 and QoS 2 messages which have been sent to the Client, @@ -246,7 +245,7 @@ init([CleanSess, ClientId, ClientPid]) -> clean_sess = CleanSess, client_id = ClientId, client_pid = ClientPid, - subscriptions = [], + subscriptions = dict:new(), inflight_queue = [], max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0), message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), @@ -284,12 +283,12 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of - {'EXIT', _, _} -> 10; - expired -> 10; - {timeout, _, _} -> 5; - collect_info -> 2; - {dispatch, _} -> 1; - _ -> 0 + {'EXIT', _, _} -> 10; + expired -> 10; + {timeout, _, _} -> 5; + collect_info -> 2; + {dispatch, _, _} -> 1; + _ -> 0 end. handle_call(info, _From, State) -> @@ -316,7 +315,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), - case TopicTable -- Subscriptions of + case TopicTable -- dict:to_list(Subscriptions) of [] -> AckFun([Qos || {_, Qos} <- TopicTable]), hibernate(Session); @@ -331,21 +330,22 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session), Subscriptions1 = - lists:foldl(fun({Topic, Qos}, Acc) -> - case lists:keyfind(Topic, 1, Acc) of - {Topic, Qos} -> + lists:foldl(fun({Topic, Qos}, Dict) -> + case dict:find(Topic, Dict) of + {ok, Qos} -> ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session), - Acc; - {Topic, OldQos} -> + Dict; + {ok, OldQos} -> ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session), - lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); - false -> + dict:store(Topic, Qos, Dict); + error -> %%TODO: the design is ugly, rewrite later...:( %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, %% a new Subscription is created and all matching retained messages are sent. emqttd_retainer:dispatch(Topic, self()), - [{Topic, Qos} | Acc] + + dict:store(Topic, Qos, Dict) end end, Subscriptions, TopicTable), hibernate(Session#session{subscriptions = Subscriptions1}) @@ -362,13 +362,8 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, ?LOG(info, "unsubscribe ~p", [Topics], Session), Subscriptions1 = - lists:foldl(fun(Topic, Acc) -> - case lists:keyfind(Topic, 1, Acc) of - {Topic, _Qos} -> - lists:keydelete(Topic, 1, Acc); - false -> - Acc - end + lists:foldl(fun(Topic, Dict) -> + dict:erase(Topic, Dict) end, Subscriptions, Topics), hibernate(Session#session{subscriptions = Subscriptions1}); @@ -485,28 +480,10 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -%% Queue messages when client is offline -handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, - message_queue = Q}) +%% Dispatch Message +handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions}) when is_record(Msg, mqtt_message) -> - hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); - -%% Dispatch qos0 message directly to client -handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, - Session = #session{client_pid = ClientPid}) -> - ClientPid ! {deliver, Msg}, - hibernate(Session); - -handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, - Session = #session{message_queue = MsgQ}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - - case check_inflight(Session) of - true -> - noreply(deliver(Msg, Session)); - false -> - hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) - end; + dispatch(fixqos(Topic, Msg, Subscriptions), Session); handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> @@ -604,6 +581,38 @@ kick(ClientId, OldPid, Pid) -> %% Clean noproc receive {'EXIT', OldPid, _} -> ok after 0 -> ok end. +%%------------------------------------------------------------------------------ +%% Dispatch Messages +%%------------------------------------------------------------------------------ + +%% Queue message if client disconnected +dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) -> + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); + +%% Deliver qos0 message directly to client +dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) -> + ClientPid ! {deliver, Msg}, + hibernate(Session); + +dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) + when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> + case check_inflight(Session) of + true -> + noreply(deliver(Msg, Session)); + false -> + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) + end. + +fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) -> + case dict:find(Topic, Subscriptions) of + {ok, SubQos} when PubQos > SubQos -> + Msg#mqtt_message{qos = SubQos}; + {ok, _SubQos} -> + Msg; + error -> + Msg + end. + %%------------------------------------------------------------------------------ %% Check inflight and awaiting_rel %%------------------------------------------------------------------------------ @@ -723,7 +732,7 @@ sess_info(#session{clean_sess = CleanSess, timestamp = CreatedAt}) -> Stats = emqttd_mqueue:stats(MessageQueue), [{clean_sess, CleanSess}, - {subscriptions, Subscriptions}, + {subscriptions, dict:to_list(Subscriptions)}, {max_inflight, MaxInflight}, {inflight_queue, length(InflightQueue)}, {message_queue, proplists:get_value(len, Stats)},