diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index e6e2d2171..e7bc50124 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -43,7 +43,8 @@ will_msg, keep_alive, awaiting_ack, - subtopics}). + subtopics, + awaiting_rel}). -define(FRAME_TYPE(Frame, Type), @@ -86,7 +87,8 @@ handle_call({go, Sock}, _From, _State) -> parse_state = emqtt_frame:initial_state(), message_id = 1, subtopics = [], - awaiting_ack = gb_trees:empty()})}. + awaiting_ack = gb_trees:empty(), + awaiting_rel = gb_trees:empty()})}. handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. @@ -96,9 +98,17 @@ handle_info({route, Msg}, #state{socket = Sock} = State) -> qos = Qos, topic = Topic, dup = Dup, - %message_id = MessageId, + message_id = MessageId, payload = Payload } = Msg, + + SendMsgId = + if + Qos > ?QOS_0 -> MessageId; + true -> 0 + end, + + %TODO: FIXME LATER Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBLISH, qos = Qos, @@ -106,7 +116,7 @@ handle_info({route, Msg}, #state{socket = Sock} = State) -> dup = Dup }, variable = #mqtt_frame_publish{ topic_name = Topic, - message_id = 1}, + message_id = SendMsgId}, payload = Payload }, send_frame(Sock, Frame), @@ -192,11 +202,16 @@ process_received_bytes(Bytes, stop({shutdown, Error}, State) end. -process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, +process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, State=#state{keep_alive=KeepAlive}) -> - ?INFO("~p", [Frame]), emqtt_keep_alive:activate(KeepAlive), - process_request(Type, Frame, State). + case validate_frame(Type, Frame) of + ok -> + handle_retained(Type, Frame), + process_request(Type, Frame, State); + {error, Reason} -> + {err, Reason, State} + end. process_request(?CONNECT, #mqtt_frame{ variable = #mqtt_frame_connect{ @@ -232,43 +247,43 @@ process_request(?CONNECT, return_code = ReturnCode }}), {ok, State1}; -process_request(?PUBACK, - #mqtt_frame{ - variable = #mqtt_frame_publish{ message_id = MessageId }}, - #state{awaiting_ack = Awaiting } = State) -> - {ok, State #state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}}; +process_request(?PUBLISH, Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> + emqtt_router:publish(make_msg(Frame)), + {ok, State}; process_request(?PUBLISH, - #mqtt_frame{ - fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, State) -> - {err, qos2_not_supported, State}; + Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_1}, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + emqtt_router:publish(make_msg(Frame)), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; process_request(?PUBLISH, + Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_2}, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + emqtt_router:publish(make_msg(Frame)), + put({msg, MsgId}, pubrec), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + + {ok, State}; + +process_request(?PUBREL, #mqtt_frame{ - fixed = #mqtt_frame_fixed{ qos = Qos, - retain = Retain, - dup = Dup }, - variable = #mqtt_frame_publish{ topic_name = Topic, - message_id = MessageId }, - payload = Payload }, #state{ socket=Sock, message_id = MsgId } = State) -> - Msg = #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - message_id = MessageId, - payload = Payload }, - case emqtt_topic:validate({publish, Topic}) of - true -> - emqtt_router:publish(Topic, Msg), - %Retained? - retained(Retain, Topic, Msg); - false -> - ?ERROR("badtopic: ~p", [Topic]) - end, + fixed = #mqtt_frame_fixed{ qos = ?QOS_1 }, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + erase({msg, MsgId}), send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; + {ok, State}; process_request(?SUBSCRIBE, #mqtt_frame{ @@ -277,43 +292,35 @@ process_request(?SUBSCRIBE, payload = undefined}, #state{socket=Sock} = State) -> - Topics1 = [Topic#mqtt_topic{qos=supported_subs_qos(Qos)} - || Topic = #mqtt_topic{name=Name, qos=Qos} <- Topics, - emqtt_topic:validate({subscribe, Name})], - [emqtt_router:subscribe({Name, Qos}, self()) || - #mqtt_topic{name=Name, qos=Qos} <- Topics1], + #mqtt_topic{name=Name, qos=Qos} <- Topics], - GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics1], + GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, variable = #mqtt_frame_suback{ message_id = MessageId, qos_table = GrantedQos}}), - {ok, State#state{subtopics=Topics1}}; + {ok, State}; process_request(?UNSUBSCRIBE, #mqtt_frame{ variable = #mqtt_frame_subscribe{message_id = MessageId, topic_table = Topics }, - payload = undefined}, #state{socket = Sock, client_id = ClientId, - subtopics = Subs0} = State) -> + payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> - [emqtt_router:unsubscribe(Name, self()) || - #mqtt_topic{name=Name} <- Topics, emqtt_topic:validate(Name)], - + [emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, variable = #mqtt_frame_suback{message_id = MessageId }}), - %TODO: fixme later - {ok, State #state{subtopics = Subs0}}; + {ok, State}; process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> %Keep alive timer KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - ?INFO("~p", [KeepAlive1]), send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), {ok, State#state{keep_alive=KeepAlive1}}; @@ -337,18 +344,16 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain, will_qos = Qos, will_topic = Topic, will_msg = Msg }) -> - #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg }. + #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. -supported_subs_qos(?QOS_0) -> ?QOS_0; -supported_subs_qos(?QOS_1) -> ?QOS_1; -supported_subs_qos(?QOS_2) -> ?QOS_1. - -send_will(#state{ will_msg = WillMsg }) -> - ?INFO("willmsg: ~p~n", [WillMsg]). +send_will_msg(#state{will_msg = undefined}) -> + ignore; +send_will_msg(#state{will_msg = WillMsg }) -> + emqtt_router:publish(WillMsg). send_frame(Sock, Frame) -> erlang:port_command(Sock, emqtt_frame:serialise(Frame)). @@ -357,7 +362,7 @@ send_frame(Sock, Frame) -> network_error(_Reason, State = #state{ conn_name = ConnStr}) -> ?INFO("MQTT detected network error for ~p~n", [ConnStr]), - send_will(State), + send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). @@ -386,10 +391,53 @@ valid_client_id(ClientId) -> ClientIdLen = length(ClientId), 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. -retained(false, _Topic, _Msg) -> +handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) -> ignore; -retained(true, Topic, #mqtt_msg{payload = <<>>}) -> - emqtt_retained:delete(Topic); -retained(true, Topic, Msg) -> - emqtt_retained:insert(Topic, Msg). + +handle_retained(?PUBLISH, #mqtt_frame{ + fixed = #mqtt_frame_fixed{retain = true}, + variable = #mqtt_frame_publish{topic_name = Topic}, + payload= <<>> }) -> + emqtt_retained:delete(Topic); + +handle_retained(?PUBLISH, Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{retain = true}, + variable = #mqtt_frame_publish{topic_name = Topic}}) -> + emqtt_retained:insert(Topic, make_msg(Frame)); + +handle_retained(_, _) -> + ignore. + +validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) -> + case emqtt_topic:validate({publish, Topic}) of + true -> ok; + false -> {error, badtopic} + end; + +validate_frame(?UNSUBSCRIBE, Frame) -> + validate_frame(?SUBSCRIBE, Frame); + +validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> + ErrTopics = [Topic || Topic <- Topics, not emqtt_topic:validate({subscribe, Topic})], + case ErrTopics of + [] -> ok; + _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic} + end; + +validate_frame(_Type, _Frame) -> + ok. + +make_msg(#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = Qos, + retain = Retain, + dup = Dup}, + variable = #mqtt_frame_publish{topic_name = Topic, + message_id = MessageId}, + payload = Payload}) -> + #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + message_id = MessageId, + payload = Payload}. diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl index 5b9a76c87..401117906 100644 --- a/src/emqtt_frame.erl +++ b/src/emqtt_frame.erl @@ -102,7 +102,7 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type, wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName, message_id = MessageId }, Payload, Rest); - {?PUBACK, <>} -> + {?PUBREL, <>} -> <> = FrameBin, wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); {Subs, <>}