fix qos_2 bugs
This commit is contained in:
parent
8b85ce4278
commit
97d27e2456
7
CHANGES
7
CHANGES
|
@ -1,3 +1,10 @@
|
|||
Changes with emqtt 0.1.3 04 Jan 2012
|
||||
|
||||
*) Feature: support QOS2 PUBREC, PUBREL,PUBCOMP messages
|
||||
|
||||
*) Bugfix: fix emqtt_frame to encode/decoe PUBREC/PUBREL messages
|
||||
|
||||
|
||||
Changes with emqtt 0.1.2 27 Dec 2012
|
||||
|
||||
*) Feature: release support like riak
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{application, emqtt,
|
||||
[
|
||||
{description, "erlang mqtt broker"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{modules, [
|
||||
emqtt,
|
||||
emqtt_app,
|
||||
|
|
|
@ -76,7 +76,7 @@ handle_call({go, Sock}, _From, _State) ->
|
|||
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
|
||||
%FIXME: merge to registry
|
||||
emqtt_client_monitor:mon(self()),
|
||||
?INFO("accepting MQTT connection (~s)~n", [ConnStr]),
|
||||
?INFO("accepting connection (~s)", [ConnStr]),
|
||||
{reply, ok,
|
||||
control_throttle(
|
||||
#state{ socket = Sock,
|
||||
|
@ -101,17 +101,18 @@ handle_info({route, Msg}, #state{socket = Sock} = State) ->
|
|||
message_id = MessageId,
|
||||
payload = Payload } = Msg,
|
||||
|
||||
SendMsgId =
|
||||
{DestQos, SendMsgId} =
|
||||
if
|
||||
Qos > ?QOS_0 -> MessageId;
|
||||
true -> 0
|
||||
Qos == ?QOS_0 -> {Qos, 0};
|
||||
Qos == ?QOS_1 -> {?QOS_0, MessageId};
|
||||
Qos == ?QOS_2 -> {?QOS_1, MessageId}
|
||||
end,
|
||||
|
||||
|
||||
%?INFO("~p route: ~p", [ConnName, Msg]),
|
||||
%TODO: FIXME LATER
|
||||
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{
|
||||
type = ?PUBLISH,
|
||||
qos = Qos,
|
||||
qos = DestQos,
|
||||
retain = Retain,
|
||||
dup = Dup },
|
||||
variable = #mqtt_frame_publish{
|
||||
|
@ -234,6 +235,7 @@ process_request(?CONNECT,
|
|||
?ERROR_MSG("MQTT login failed - no credentials"),
|
||||
{?CONNACK_CREDENTIALS, State};
|
||||
true ->
|
||||
?INFO("connect from clientid: ~s", [ClientId]),
|
||||
ok = emqtt_registry:register(ClientId, self()),
|
||||
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
||||
{?CONNACK_ACCEPT,
|
||||
|
@ -324,7 +326,8 @@ process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAliv
|
|||
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
||||
{ok, State#state{keep_alive=KeepAlive1}};
|
||||
|
||||
process_request(?DISCONNECT, #mqtt_frame{}, State) ->
|
||||
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
|
||||
?INFO("~s disconnected", [ClientId]),
|
||||
{stop, State}.
|
||||
|
||||
next_msg_id(State = #state{ message_id = 16#ffff }) ->
|
||||
|
|
|
@ -188,7 +188,8 @@ serialise_variable(#mqtt_frame_fixed { type = ?PUBLISH,
|
|||
TopicBin = serialise_utf(TopicName),
|
||||
MessageIdBin = case Qos of
|
||||
0 -> <<>>;
|
||||
1 -> <<MessageId:16/big>>
|
||||
1 -> <<MessageId:16/big>>;
|
||||
2 -> <<MessageId:16/big>>
|
||||
end,
|
||||
serialise_fixed(Fixed, <<TopicBin/binary, MessageIdBin/binary>>, PayloadBin);
|
||||
|
||||
|
@ -198,6 +199,17 @@ serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed,
|
|||
MessageIdBin = <<MessageId:16/big>>,
|
||||
serialise_fixed(Fixed, MessageIdBin, PayloadBin);
|
||||
|
||||
|
||||
serialise_variable(#mqtt_frame_fixed { type = ?PUBREC } = Fixed,
|
||||
#mqtt_frame_publish{ message_id = MsgId},
|
||||
PayloadBin) ->
|
||||
serialise_fixed(Fixed, <<MsgId:16/big>>, PayloadBin);
|
||||
|
||||
serialise_variable(#mqtt_frame_fixed { type = ?PUBCOMP } = Fixed,
|
||||
#mqtt_frame_publish{ message_id = MsgId},
|
||||
PayloadBin) ->
|
||||
serialise_fixed(Fixed, <<MsgId:16/big>>, PayloadBin);
|
||||
|
||||
serialise_variable(#mqtt_frame_fixed {} = Fixed,
|
||||
undefined,
|
||||
<<>> = _PayloadBin) ->
|
||||
|
|
|
@ -61,7 +61,6 @@ handle_call(Req, _From, State) ->
|
|||
{stop, {badreq, Req}, State}.
|
||||
|
||||
handle_cast({register, ClientId, Pid}, State) ->
|
||||
?INFO("register ~p ~p", [ClientId, Pid]),
|
||||
case ets:lookup(client, ClientId) of
|
||||
[{_, {OldPid, MRef}}] ->
|
||||
catch gen_server2:call(OldPid, duplicate_id),
|
||||
|
@ -73,7 +72,6 @@ handle_cast({register, ClientId, Pid}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_cast({unregister, ClientId}, State) ->
|
||||
?INFO("unregister ~p", [ClientId]),
|
||||
case ets:lookup(client, ClientId) of
|
||||
[{_, {_Pid, MRef}}] ->
|
||||
erlang:demonitor(MRef),
|
||||
|
|
Loading…
Reference in New Issue