diff --git a/include/emqtt_frame.hrl b/include/emqtt_frame.hrl index fb6eebe29..701351a27 100644 --- a/include/emqtt_frame.hrl +++ b/include/emqtt_frame.hrl @@ -98,3 +98,4 @@ message_id, payload}). + diff --git a/src/.emqtt_frame.erl.swp b/src/.emqtt_frame.erl.swp new file mode 100644 index 000000000..6c2212e6f Binary files /dev/null and b/src/.emqtt_frame.erl.swp differ diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index be7fa4545..8615d6679 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -2,7 +2,7 @@ -behaviour(gen_server2). --export([start_link/0, go/2]). +-export([start_link/0, go/2, info/1]). -export([init/1, handle_call/3, @@ -22,22 +22,16 @@ connection_state, conserve, parse_state, - proc_state }). - --record(proc_state, { socket, - subscriptions, - consumer_tags, - unacked_pubs, - awaiting_ack, - awaiting_seqno, - message_id, - client_id, - clean_sess, - will_msg, - channels, - connection, - exchange }). + message_id, + client_id, + clean_sess, + will_msg, + awaiting_ack, + subscriptions + }). +-define(FRAME_TYPE(Frame, Type), + Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). start_link() -> gen_server2:start_link(?MODULE, [], []). @@ -45,9 +39,19 @@ start_link() -> go(Pid, Sock) -> gen_server2:call(Pid, {go, Sock}). +info(Pid) -> + gen_server2:call(Pid, info). + init([]) -> {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}. +handle_call(info, _From, #state{conn_name=ConnName, + message_id=MsgId, client_id=ClientId} = State) -> + Info = [{conn_name, ConnName}, + {message_id, MsgId}, + {client_id, ClientId}], + {reply, Info, State}; + handle_call({go, Sock}, _From, _State) -> process_flag(trap_exit, true), ok = throw_on_error( @@ -62,17 +66,19 @@ handle_call({go, Sock}, _From, _State) -> connection_state = running, conserve = false, parse_state = emqtt_frame:initial_state(), - proc_state = emqtt_processor:initial_state(Sock) })}. + message_id = 1, + subscriptions = dict:new(), + awaiting_ack = gb_tree:empty()})}. handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. -handle_info({route, Msg}, #state{proc_state=PState} = State) -> +handle_info({route, Msg}, #state{socket = Sock} = State) -> #mqtt_msg{ retain = Retain, qos = Qos, topic = Topic, dup = Dup, - message_id = MessageId, + %message_id = MessageId, payload = Payload } = Msg, Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ @@ -85,7 +91,7 @@ handle_info({route, Msg}, #state{proc_state=PState} = State) -> message_id = 1}, payload = Payload }, - emqtt_processor:send_client(Frame, PState), + send_frame(Sock, Frame), {noreply, State}; handle_info({inet_reply, _Ref, ok}, State) -> @@ -128,7 +134,6 @@ process_received_bytes(<<>>, State) -> {noreply, State}; process_received_bytes(Bytes, State = #state{ parse_state = ParseState, - proc_state = ProcState, conn_name = ConnStr }) -> case emqtt_frame:parse(Bytes, ParseState) of @@ -137,35 +142,181 @@ process_received_bytes(Bytes, control_throttle( State #state{ parse_state = ParseState1 }), hibernate}; {ok, Frame, Rest} -> - case emqtt_processor:process_frame(Frame, ProcState) of - {ok, ProcState1} -> + case process_frame(Frame, State) of + {ok, State1} -> PS = emqtt_frame:initial_state(), process_received_bytes( Rest, - State #state{ parse_state = PS, - proc_state = ProcState1 }); - {err, Reason, ProcState1} -> - error_logger:info_msg("MQTT protocol error ~p for connection ~p~n", + State1 #state{ parse_state = PS}); + {err, Reason, State1} -> + ?ERROR("MQTT protocol error ~p for connection ~p~n", [Reason, ConnStr]), - stop({shutdown, Reason}, pstate(State, ProcState1)); - {stop, ProcState1} -> - stop(normal, pstate(State, ProcState1)) + stop({shutdown, Reason}, State1); + {stop, State1} -> + stop(normal, State1) end; {error, Error} -> - error_logger:erro_msg("MQTT detected framing error ~p for connection ~p~n", + ?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), stop({shutdown, Error}, State) end. -pstate(State = #state {}, PState = #proc_state{}) -> - State #state{ proc_state = PState }. +process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + State ) -> + process_request(Type, Frame, State). + +process_request(?CONNECT, + #mqtt_frame{ variable = #mqtt_frame_connect{ + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + client_id = ClientId } = Var}, #state{socket = Sock} = State) -> + ?INFO("connect frame: ~p~n", [Var]), + {ReturnCode, State1} = + case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, + emqtt_util:valid_client_id(ClientId)} of + {false, _} -> + {?CONNACK_PROTO_VER, State}; + {_, false} -> + {?CONNACK_INVALID_ID, State}; + _ -> + case creds(Username, Password) of + nocreds -> + error_logger:error_msg("MQTT login failed - no credentials~n"), + {?CONNACK_CREDENTIALS, State}; + {UserBin, PassBin} -> + {?CONNACK_ACCEPT, + State #state{ will_msg = make_will_msg(Var), + client_id = ClientId }} + end + end, + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, + variable = #mqtt_frame_connack{ + return_code = ReturnCode }}), + {ok, State1}; + +process_request(?PUBACK, + #mqtt_frame{ + variable = #mqtt_frame_publish{ message_id = MessageId }}, + #state{awaiting_ack = Awaiting } = State) -> + {ok, State #state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}}; + +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, State) -> + {err, qos2_not_supported, State}; + +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = Qos, + retain = Retain, + dup = Dup }, + variable = #mqtt_frame_publish{ topic_name = Topic, + message_id = MessageId }, + payload = Payload }, #state{ socket=Sock, message_id = MsgId } = State) -> + Msg = #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + message_id = MessageId, + payload = Payload }, + emqtt_router:route(Msg), + + send_frame(Sock, + #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?SUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, + #state{socket=Sock} = State0) -> + QosResponse = + lists:foldl(fun (#mqtt_topic{ name = TopicName, + qos = Qos }, QosList) -> + SupportedQos = supported_subs_qos(Qos), + [SupportedQos | QosList] + end, [], Topics), + + [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], + + [emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()}) + || #mqtt_topic{name=Name} <- Topics], + + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = QosResponse }}), + + {ok, State0}; + +process_request(?UNSUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, #state{ socket = Sock, client_id = ClientId, + subscriptions = Subs0} = State) -> + + + [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) + || #mqtt_topic{name=Name} <- Topics], + + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, + variable = #mqtt_frame_suback{ message_id = MessageId }}), + + {ok, State #state{ subscriptions = Subs0 }}; + +process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock}=State) -> + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), + {ok, State}; + +process_request(?DISCONNECT, #mqtt_frame{}, State) -> + {stop, State}. + +next_msg_id(State = #state{ message_id = 16#ffff }) -> + State #state{ message_id = 1 }; +next_msg_id(State = #state{ message_id = MsgId }) -> + State #state{ message_id = MsgId + 1 }. + +maybe_clean_sess(false, _Conn, _ClientId) -> + % todo: establish subscription to deliver old unacknowledged messages + ok. + +%%---------------------------------------------------------------------------- + +make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> + undefined; +make_will_msg(#mqtt_frame_connect{ will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_msg = Msg }) -> + #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. + +creds(User, Pass) -> + {User, Pass}. + +supported_subs_qos(?QOS_0) -> ?QOS_0; +supported_subs_qos(?QOS_1) -> ?QOS_1; +supported_subs_qos(?QOS_2) -> ?QOS_1. + +send_will(#state{ will_msg = WillMsg }) -> + ?INFO("willmsg: ~p~n", [WillMsg]). + +send_frame(Sock, Frame) -> + erlang:port_command(Sock, emqtt_frame:serialise(Frame)). %%---------------------------------------------------------------------------- network_error(_Reason, - State = #state{ conn_name = ConnStr, - proc_state = PState }) -> - error_logger:info_msg("MQTT detected network error for ~p~n", [ConnStr]), - emqtt_processor:send_will(PState), + State = #state{ conn_name = ConnStr}) -> + ?INFO("MQTT detected network error for ~p~n", [ConnStr]), + send_will(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). @@ -189,3 +340,4 @@ control_throttle(State = #state{ connection_state = Flow, stop(Reason, State ) -> {stop, Reason, State}. + diff --git a/src/emqtt_processor.erl b/src/emqtt_processor.erl deleted file mode 100644 index cc277bc04..000000000 --- a/src/emqtt_processor.erl +++ /dev/null @@ -1,224 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - -%%Don't send amqp message - --module(emqtt_processor). - --export([info/2, initial_state/1, - process_frame/2, send_client/2, send_will/1]). - --include("emqtt.hrl"). --include("emqtt_frame.hrl"). - --record(proc_state, { socket, - subscriptions, - consumer_tags, - unacked_pubs, - awaiting_ack, - awaiting_seqno, - message_id, - client_id, - clean_sess, - will_msg, - channels, - connection, - exchange }). - --define(FRAME_TYPE(Frame, Type), - Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). - -initial_state(Socket) -> - #proc_state{ unacked_pubs = gb_trees:empty(), - awaiting_ack = gb_trees:empty(), - message_id = 1, - subscriptions = dict:new(), - consumer_tags = {undefined, undefined}, - channels = {undefined, undefined}, - exchange = emqtt_util:env(exchange), - socket = Socket }. - -info(client_id, #proc_state{ client_id = ClientId }) -> ClientId. - -process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, - PState ) -> - process_request(Type, Frame, PState). - -process_request(?CONNECT, - #mqtt_frame{ variable = #mqtt_frame_connect{ - username = Username, - password = Password, - proto_ver = ProtoVersion, - clean_sess = CleanSess, - client_id = ClientId } = Var}, PState) -> - error_logger:info_msg("connect frame: ~p~n", [Var]), - {ReturnCode, PState1} = - case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, - emqtt_util:valid_client_id(ClientId)} of - {false, _} -> - {?CONNACK_PROTO_VER, PState}; - {_, false} -> - {?CONNACK_INVALID_ID, PState}; - _ -> - case creds(Username, Password) of - nocreds -> - error_logger:error_msg("MQTT login failed - no credentials~n"), - {?CONNACK_CREDENTIALS, PState}; - {UserBin, PassBin} -> - {?CONNACK_ACCEPT, - PState #proc_state{ will_msg = make_will_msg(Var), - client_id = ClientId }} - end - end, - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, - variable = #mqtt_frame_connack{ - return_code = ReturnCode }}, PState1), - {ok, PState1}; - -process_request(?PUBACK, - #mqtt_frame{ - variable = #mqtt_frame_publish{ message_id = MessageId }}, - #proc_state{awaiting_ack = Awaiting } = PState) -> - {ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}}; - -process_request(?PUBLISH, - #mqtt_frame{ - fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) -> - {err, qos2_not_supported, PState}; - -process_request(?PUBLISH, - #mqtt_frame{ - fixed = #mqtt_frame_fixed{ qos = Qos, - retain = Retain, - dup = Dup }, - variable = #mqtt_frame_publish{ topic_name = Topic, - message_id = MessageId }, - payload = Payload }, #proc_state{ message_id = MsgId } = PState) -> - Msg = #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - message_id = MessageId, - payload = Payload }, - emqtt_router:route(Msg), - - send_client( - #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, - variable = #mqtt_frame_publish{ message_id = MsgId}}, - PState), - {ok, PState}; - - -process_request(?SUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, - #proc_state{} = PState0) -> - QosResponse = - lists:foldl(fun (#mqtt_topic{ name = TopicName, - qos = Qos }, QosList) -> - SupportedQos = supported_subs_qos(Qos), - [SupportedQos | QosList] - end, [], Topics), - - [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], - - [emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()}) - || #mqtt_topic{name=Name} <- Topics], - - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = QosResponse }}, PState0), - - {ok, PState0}; - -process_request(?UNSUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, #proc_state{ client_id = ClientId, - subscriptions = Subs0} = PState) -> - - - [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) - || #mqtt_topic{name=Name} <- Topics], - - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, - variable = #mqtt_frame_suback{ message_id = MessageId }}, - PState), - - {ok, PState #proc_state{ subscriptions = Subs0 }}; - -process_request(?PINGREQ, #mqtt_frame{}, PState) -> - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, - PState), - {ok, PState}; - -process_request(?DISCONNECT, #mqtt_frame{}, PState) -> - {stop, PState}. - - - -next_msg_id(PState = #proc_state{ message_id = 16#ffff }) -> - PState #proc_state{ message_id = 1 }; -next_msg_id(PState = #proc_state{ message_id = MsgId }) -> - PState #proc_state{ message_id = MsgId + 1 }. - -%% decide at which qos level to deliver based on subscription -%% and the message publish qos level. non-MQTT publishes are -%% assumed to be qos 1, regardless of delivery_mode. -delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) -> - {?QOS_0, ?QOS_0}; -delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) -> - case emqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of - {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1}; - undefined -> {?QOS_1, ?QOS_1} - end. - -maybe_clean_sess(false, _Conn, _ClientId) -> - % todo: establish subscription to deliver old unacknowledged messages - ok. - -%%---------------------------------------------------------------------------- - -make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> - undefined; -make_will_msg(#mqtt_frame_connect{ will_retain = Retain, - will_qos = Qos, - will_topic = Topic, - will_msg = Msg }) -> - #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg }. - -creds(User, Pass) -> - {User, Pass}. - -supported_subs_qos(?QOS_0) -> ?QOS_0; -supported_subs_qos(?QOS_1) -> ?QOS_1; -supported_subs_qos(?QOS_2) -> ?QOS_1. - -send_will(PState = #proc_state{ will_msg = WillMsg }) -> - error_logger:info_msg("willmsg: ~p~n", [WillMsg]). - - -send_client(Frame, #proc_state{ socket = Sock }) -> - erlang:port_command(Sock, emqtt_frame:serialise(Frame)). -