diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index fd8859672..8133fadc1 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -93,35 +93,34 @@ handle_call({go, Sock}, _From, _State) -> handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. -handle_info({route, Msg}, #state{socket = Sock} = State) -> - #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - message_id = MessageId, - payload = Payload } = Msg, +handle_info({route, Msg}, #state{socket = Sock, message_id=MsgId} = State) -> - {DestQos, SendMsgId} = - if - Qos == ?QOS_0 -> {Qos, 0}; - Qos == ?QOS_1 -> {?QOS_0, MessageId}; - Qos == ?QOS_2 -> {?QOS_1, MessageId} - end, + #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + payload = Payload} = Msg, - %?INFO("~p route: ~p", [ConnName, Msg]), - %TODO: FIXME LATER - Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ - type = ?PUBLISH, - qos = DestQos, - retain = Retain, - dup = Dup }, - variable = #mqtt_frame_publish{ - topic_name = Topic, - message_id = SendMsgId}, - payload = Payload }, + Frame = #mqtt_frame{ + fixed = #mqtt_frame_fixed{type = ?PUBLISH, + qos = Qos, + retain = Retain, + dup = Dup}, + variable = #mqtt_frame_publish{topic_name = Topic, + message_id = if + Qos == ?QOS_0 -> undefined; + true -> MsgId + end}, + payload = Payload}, send_frame(Sock, Frame), - {noreply, State}; + + if + Qos == ?QOS_0 -> + {noreply, State}; + true -> + {noreply, next_msg_id(State)} + end; handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; @@ -204,12 +203,13 @@ process_received_bytes(Bytes, end. process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, - State=#state{keep_alive=KeepAlive}) -> - emqtt_keep_alive:activate(KeepAlive), + State=#state{client_id=ClientId, keep_alive=KeepAlive}) -> + KeepAlive1 = emqtt_keep_alive:activate(KeepAlive), case validate_frame(Type, Frame) of ok -> + ?INFO("frame from ~s: ~p", [ClientId, Frame]), handle_retained(Type, Frame), - process_request(Type, Frame, State); + process_request(Type, Frame, State#state{keep_alive=KeepAlive1}); {error, Reason} -> {err, Reason, State} end. @@ -235,7 +235,7 @@ process_request(?CONNECT, ?ERROR_MSG("MQTT login failed - no credentials"), {?CONNACK_CREDENTIALS, State}; true -> - ?INFO("connect from clientid: ~s", [ClientId]), + ?INFO("connect from clientid: ~s, ~p", [ClientId, AlivePeriod]), ok = emqtt_registry:register(ClientId, self()), KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), {?CONNACK_ACCEPT, @@ -276,6 +276,19 @@ process_request(?PUBLISH, {ok, State}; +process_request(?PUBACK, #mqtt_frame{}, State) -> + %TODO: fixme later + {ok, State}; + +process_request(?PUBREC, #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + %TODO: fixme later + send_frame(Sock, + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + process_request(?PUBREL, #mqtt_frame{ fixed = #mqtt_frame_fixed{ qos = ?QOS_1 }, @@ -287,6 +300,11 @@ process_request(?PUBREL, variable = #mqtt_frame_publish{ message_id = MsgId}}), {ok, State}; +process_request(?PUBCOMP, #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> + %TODO: fixme later + {ok, State}; + process_request(?SUBSCRIBE, #mqtt_frame{ variable = #mqtt_frame_subscribe{message_id = MessageId, @@ -323,7 +341,7 @@ process_request(?UNSUBSCRIBE, process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> %Keep alive timer KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), {ok, State#state{keep_alive=KeepAlive1}}; process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> @@ -362,9 +380,9 @@ send_frame(Sock, Frame) -> erlang:port_command(Sock, emqtt_frame:serialise(Frame)). %%---------------------------------------------------------------------------- -network_error(_Reason, +network_error(Reason, State = #state{ conn_name = ConnStr}) -> - ?INFO("MQTT detected network error for ~p~n", [ConnStr]), + ?INFO("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl index 6b6c43082..a244c8cd4 100644 --- a/src/emqtt_frame.erl +++ b/src/emqtt_frame.erl @@ -99,12 +99,21 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type, _ -> <> = Rest1, {M, R} end, - wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName, - message_id = MessageId }, + wrap(Fixed, #mqtt_frame_publish {topic_name = TopicName, + message_id = MessageId }, Payload, Rest); + {?PUBACK, <>} -> + <> = FrameBin, + wrap(Fixed, #mqtt_frame_publish{message_id = MessageId}, Rest); + {?PUBREC, <>} -> + <> = FrameBin, + wrap(Fixed, #mqtt_frame_publish{message_id = MessageId}, Rest); {?PUBREL, <>} -> <> = FrameBin, wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); + {?PUBCOMP, <>} -> + <> = FrameBin, + wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); {Subs, <>} when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE -> 1 = Qos, @@ -199,12 +208,16 @@ serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed, MessageIdBin = <>, serialise_fixed(Fixed, MessageIdBin, PayloadBin); - serialise_variable(#mqtt_frame_fixed { type = ?PUBREC } = Fixed, #mqtt_frame_publish{ message_id = MsgId}, PayloadBin) -> serialise_fixed(Fixed, <>, PayloadBin); +serialise_variable(#mqtt_frame_fixed { type = ?PUBREL } = Fixed, + #mqtt_frame_publish{ message_id = MsgId}, + PayloadBin) -> + serialise_fixed(Fixed, <>, PayloadBin); + serialise_variable(#mqtt_frame_fixed { type = ?PUBCOMP } = Fixed, #mqtt_frame_publish{ message_id = MsgId}, PayloadBin) -> diff --git a/src/emqtt_keep_alive.erl b/src/emqtt_keep_alive.erl index 2885180d5..9fbebca05 100644 --- a/src/emqtt_keep_alive.erl +++ b/src/emqtt_keep_alive.erl @@ -37,7 +37,7 @@ state(#keep_alive{state=State}) -> activate(undefined) -> undefined; activate(KeepAlive) when is_record(KeepAlive, keep_alive) -> - KeepAlive#keep_alive{state=ative}. + KeepAlive#keep_alive{state=active}. reset(undefined) -> undefined; diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index dd314f81c..611038cfc 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -67,7 +67,7 @@ publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) -> %route locally, should only be called by publish route(Topic, Msg) -> - [Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Topic)]. + [Client ! {route, Msg#mqtt_msg{qos=Qos}} || #subscriber{qos=Qos, client=Client} <- ets:lookup(subscriber, Topic)]. match(Topic) when is_list(Topic) -> TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),