From 97d27e2456135c82b8aabc1f05c8526084a149df Mon Sep 17 00:00:00 2001 From: erylee Date: Fri, 4 Jan 2013 13:24:18 +0800 Subject: [PATCH] fix qos_2 bugs --- CHANGES | 7 +++++++ src/emqtt.app.src | 2 +- src/emqtt_client.erl | 17 ++++++++++------- src/emqtt_frame.erl | 14 +++++++++++++- src/emqtt_registry.erl | 2 -- 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/CHANGES b/CHANGES index 6e7e12003..76077e405 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +Changes with emqtt 0.1.3 04 Jan 2012 + + *) Feature: support QOS2 PUBREC, PUBREL,PUBCOMP messages + + *) Bugfix: fix emqtt_frame to encode/decoe PUBREC/PUBREL messages + + Changes with emqtt 0.1.2 27 Dec 2012 *) Feature: release support like riak diff --git a/src/emqtt.app.src b/src/emqtt.app.src index cf2b7543c..a426ccc9a 100644 --- a/src/emqtt.app.src +++ b/src/emqtt.app.src @@ -1,7 +1,7 @@ {application, emqtt, [ {description, "erlang mqtt broker"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {modules, [ emqtt, emqtt_app, diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 5fa98f413..fd8859672 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -76,7 +76,7 @@ handle_call({go, Sock}, _From, _State) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), %FIXME: merge to registry emqtt_client_monitor:mon(self()), - ?INFO("accepting MQTT connection (~s)~n", [ConnStr]), + ?INFO("accepting connection (~s)", [ConnStr]), {reply, ok, control_throttle( #state{ socket = Sock, @@ -101,17 +101,18 @@ handle_info({route, Msg}, #state{socket = Sock} = State) -> message_id = MessageId, payload = Payload } = Msg, - SendMsgId = + {DestQos, SendMsgId} = if - Qos > ?QOS_0 -> MessageId; - true -> 0 + Qos == ?QOS_0 -> {Qos, 0}; + Qos == ?QOS_1 -> {?QOS_0, MessageId}; + Qos == ?QOS_2 -> {?QOS_1, MessageId} end, - + %?INFO("~p route: ~p", [ConnName, Msg]), %TODO: FIXME LATER Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBLISH, - qos = Qos, + qos = DestQos, retain = Retain, dup = Dup }, variable = #mqtt_frame_publish{ @@ -234,6 +235,7 @@ process_request(?CONNECT, ?ERROR_MSG("MQTT login failed - no credentials"), {?CONNACK_CREDENTIALS, State}; true -> + ?INFO("connect from clientid: ~s", [ClientId]), ok = emqtt_registry:register(ClientId, self()), KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), {?CONNACK_ACCEPT, @@ -324,7 +326,8 @@ process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAliv send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), {ok, State#state{keep_alive=KeepAlive1}}; -process_request(?DISCONNECT, #mqtt_frame{}, State) -> +process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> + ?INFO("~s disconnected", [ClientId]), {stop, State}. next_msg_id(State = #state{ message_id = 16#ffff }) -> diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl index 401117906..6b6c43082 100644 --- a/src/emqtt_frame.erl +++ b/src/emqtt_frame.erl @@ -188,7 +188,8 @@ serialise_variable(#mqtt_frame_fixed { type = ?PUBLISH, TopicBin = serialise_utf(TopicName), MessageIdBin = case Qos of 0 -> <<>>; - 1 -> <> + 1 -> <>; + 2 -> <> end, serialise_fixed(Fixed, <>, PayloadBin); @@ -198,6 +199,17 @@ serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed, MessageIdBin = <>, serialise_fixed(Fixed, MessageIdBin, PayloadBin); + +serialise_variable(#mqtt_frame_fixed { type = ?PUBREC } = Fixed, + #mqtt_frame_publish{ message_id = MsgId}, + PayloadBin) -> + serialise_fixed(Fixed, <>, PayloadBin); + +serialise_variable(#mqtt_frame_fixed { type = ?PUBCOMP } = Fixed, + #mqtt_frame_publish{ message_id = MsgId}, + PayloadBin) -> + serialise_fixed(Fixed, <>, PayloadBin); + serialise_variable(#mqtt_frame_fixed {} = Fixed, undefined, <<>> = _PayloadBin) -> diff --git a/src/emqtt_registry.erl b/src/emqtt_registry.erl index b8afab57d..264a6f60b 100644 --- a/src/emqtt_registry.erl +++ b/src/emqtt_registry.erl @@ -61,7 +61,6 @@ handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast({register, ClientId, Pid}, State) -> - ?INFO("register ~p ~p", [ClientId, Pid]), case ets:lookup(client, ClientId) of [{_, {OldPid, MRef}}] -> catch gen_server2:call(OldPid, duplicate_id), @@ -73,7 +72,6 @@ handle_cast({register, ClientId, Pid}, State) -> {noreply, State}; handle_cast({unregister, ClientId}, State) -> - ?INFO("unregister ~p", [ClientId]), case ets:lookup(client, ClientId) of [{_, {_Pid, MRef}}] -> erlang:demonitor(MRef),