fix qos_2 subscribe

This commit is contained in:
erylee 2013-01-04 16:29:35 +08:00
parent 97d27e2456
commit ba5e21278e
4 changed files with 68 additions and 37 deletions

View File

@ -93,35 +93,34 @@ handle_call({go, Sock}, _From, _State) ->
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}. {stop, {badmsg, Msg}, State}.
handle_info({route, Msg}, #state{socket = Sock} = State) -> handle_info({route, Msg}, #state{socket = Sock, message_id=MsgId} = State) ->
#mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload } = Msg,
{DestQos, SendMsgId} = #mqtt_msg{retain = Retain,
if qos = Qos,
Qos == ?QOS_0 -> {Qos, 0}; topic = Topic,
Qos == ?QOS_1 -> {?QOS_0, MessageId}; dup = Dup,
Qos == ?QOS_2 -> {?QOS_1, MessageId} payload = Payload} = Msg,
end,
%?INFO("~p route: ~p", [ConnName, Msg]), Frame = #mqtt_frame{
%TODO: FIXME LATER fixed = #mqtt_frame_fixed{type = ?PUBLISH,
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ qos = Qos,
type = ?PUBLISH, retain = Retain,
qos = DestQos, dup = Dup},
retain = Retain, variable = #mqtt_frame_publish{topic_name = Topic,
dup = Dup }, message_id = if
variable = #mqtt_frame_publish{ Qos == ?QOS_0 -> undefined;
topic_name = Topic, true -> MsgId
message_id = SendMsgId}, end},
payload = Payload }, payload = Payload},
send_frame(Sock, Frame), send_frame(Sock, Frame),
{noreply, State};
if
Qos == ?QOS_0 ->
{noreply, State};
true ->
{noreply, next_msg_id(State)}
end;
handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
@ -204,12 +203,13 @@ process_received_bytes(Bytes,
end. end.
process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
State=#state{keep_alive=KeepAlive}) -> State=#state{client_id=ClientId, keep_alive=KeepAlive}) ->
emqtt_keep_alive:activate(KeepAlive), KeepAlive1 = emqtt_keep_alive:activate(KeepAlive),
case validate_frame(Type, Frame) of case validate_frame(Type, Frame) of
ok -> ok ->
?INFO("frame from ~s: ~p", [ClientId, Frame]),
handle_retained(Type, Frame), handle_retained(Type, Frame),
process_request(Type, Frame, State); process_request(Type, Frame, State#state{keep_alive=KeepAlive1});
{error, Reason} -> {error, Reason} ->
{err, Reason, State} {err, Reason, State}
end. end.
@ -235,7 +235,7 @@ process_request(?CONNECT,
?ERROR_MSG("MQTT login failed - no credentials"), ?ERROR_MSG("MQTT login failed - no credentials"),
{?CONNACK_CREDENTIALS, State}; {?CONNACK_CREDENTIALS, State};
true -> true ->
?INFO("connect from clientid: ~s", [ClientId]), ?INFO("connect from clientid: ~s, ~p", [ClientId, AlivePeriod]),
ok = emqtt_registry:register(ClientId, self()), ok = emqtt_registry:register(ClientId, self()),
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
{?CONNACK_ACCEPT, {?CONNACK_ACCEPT,
@ -276,6 +276,19 @@ process_request(?PUBLISH,
{ok, State}; {ok, State};
process_request(?PUBACK, #mqtt_frame{}, State) ->
%TODO: fixme later
{ok, State};
process_request(?PUBREC, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
%TODO: fixme later
send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBREL, process_request(?PUBREL,
#mqtt_frame{ #mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = ?QOS_1 }, fixed = #mqtt_frame_fixed{ qos = ?QOS_1 },
@ -287,6 +300,11 @@ process_request(?PUBREL,
variable = #mqtt_frame_publish{ message_id = MsgId}}), variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State}; {ok, State};
process_request(?PUBCOMP, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
%TODO: fixme later
{ok, State};
process_request(?SUBSCRIBE, process_request(?SUBSCRIBE,
#mqtt_frame{ #mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId, variable = #mqtt_frame_subscribe{message_id = MessageId,
@ -323,7 +341,7 @@ process_request(?UNSUBSCRIBE,
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
%Keep alive timer %Keep alive timer
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
{ok, State#state{keep_alive=KeepAlive1}}; {ok, State#state{keep_alive=KeepAlive1}};
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
@ -362,9 +380,9 @@ send_frame(Sock, Frame) ->
erlang:port_command(Sock, emqtt_frame:serialise(Frame)). erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
network_error(_Reason, network_error(Reason,
State = #state{ conn_name = ConnStr}) -> State = #state{ conn_name = ConnStr}) ->
?INFO("MQTT detected network error for ~p~n", [ConnStr]), ?INFO("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
send_will_msg(State), send_will_msg(State),
% todo: flush channel after publish % todo: flush channel after publish
stop({shutdown, conn_closed}, State). stop({shutdown, conn_closed}, State).

View File

@ -99,12 +99,21 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type,
_ -> <<M:16/big, R/binary>> = Rest1, _ -> <<M:16/big, R/binary>> = Rest1,
{M, R} {M, R}
end, end,
wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName, wrap(Fixed, #mqtt_frame_publish {topic_name = TopicName,
message_id = MessageId }, message_id = MessageId },
Payload, Rest); Payload, Rest);
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
<<MessageId:16/big>> = FrameBin,
wrap(Fixed, #mqtt_frame_publish{message_id = MessageId}, Rest);
{?PUBREC, <<FrameBin:Length/binary, Rest/binary>>} ->
<<MessageId:16/big>> = FrameBin,
wrap(Fixed, #mqtt_frame_publish{message_id = MessageId}, Rest);
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} -> {?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
<<MessageId:16/big>> = FrameBin, <<MessageId:16/big>> = FrameBin,
wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest);
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
<<MessageId:16/big>> = FrameBin,
wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest);
{Subs, <<FrameBin:Length/binary, Rest/binary>>} {Subs, <<FrameBin:Length/binary, Rest/binary>>}
when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE -> when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE ->
1 = Qos, 1 = Qos,
@ -199,12 +208,16 @@ serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed,
MessageIdBin = <<MessageId:16/big>>, MessageIdBin = <<MessageId:16/big>>,
serialise_fixed(Fixed, MessageIdBin, PayloadBin); serialise_fixed(Fixed, MessageIdBin, PayloadBin);
serialise_variable(#mqtt_frame_fixed { type = ?PUBREC } = Fixed, serialise_variable(#mqtt_frame_fixed { type = ?PUBREC } = Fixed,
#mqtt_frame_publish{ message_id = MsgId}, #mqtt_frame_publish{ message_id = MsgId},
PayloadBin) -> PayloadBin) ->
serialise_fixed(Fixed, <<MsgId:16/big>>, PayloadBin); serialise_fixed(Fixed, <<MsgId:16/big>>, PayloadBin);
serialise_variable(#mqtt_frame_fixed { type = ?PUBREL } = Fixed,
#mqtt_frame_publish{ message_id = MsgId},
PayloadBin) ->
serialise_fixed(Fixed, <<MsgId:16/big>>, PayloadBin);
serialise_variable(#mqtt_frame_fixed { type = ?PUBCOMP } = Fixed, serialise_variable(#mqtt_frame_fixed { type = ?PUBCOMP } = Fixed,
#mqtt_frame_publish{ message_id = MsgId}, #mqtt_frame_publish{ message_id = MsgId},
PayloadBin) -> PayloadBin) ->

View File

@ -37,7 +37,7 @@ state(#keep_alive{state=State}) ->
activate(undefined) -> activate(undefined) ->
undefined; undefined;
activate(KeepAlive) when is_record(KeepAlive, keep_alive) -> activate(KeepAlive) when is_record(KeepAlive, keep_alive) ->
KeepAlive#keep_alive{state=ative}. KeepAlive#keep_alive{state=active}.
reset(undefined) -> reset(undefined) ->
undefined; undefined;

View File

@ -67,7 +67,7 @@ publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) ->
%route locally, should only be called by publish %route locally, should only be called by publish
route(Topic, Msg) -> route(Topic, Msg) ->
[Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Topic)]. [Client ! {route, Msg#mqtt_msg{qos=Qos}} || #subscriber{qos=Qos, client=Client} <- ets:lookup(subscriber, Topic)].
match(Topic) when is_list(Topic) -> match(Topic) when is_list(Topic) ->
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]), TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),