From 29a8a0f28370d7e6803b031d44d29ff1e5f9fa5e Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Dec 2014 15:50:18 +0800 Subject: [PATCH] move 'process_request' to emqtt_protocol --- apps/emqtt/include/emqtt_frame.hrl | 17 +++ apps/emqtt/src/emqtt_client.erl | 152 --------------------------- apps/emqtt/src/emqtt_protocol.erl | 162 +++++++++++++++++++++++++++++ 3 files changed, 179 insertions(+), 152 deletions(-) diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl index e00191261..1aca3243c 100644 --- a/apps/emqtt/include/emqtt_frame.hrl +++ b/apps/emqtt/include/emqtt_frame.hrl @@ -23,6 +23,8 @@ -define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MINOR, 1). +-define(CLIENT_ID_MAXLEN, 23). + %% frame types -define(CONNECT, 1). @@ -49,6 +51,21 @@ -define(CONNACK_CREDENTIALS, 4). %% bad user name or password -define(CONNACK_AUTH, 5). %% not authorized +-record(state, {socket, + conn_name, + await_recv, + connection_state, + conserve, + parse_state, + message_id, + client_id, + clean_sess, + will_msg, + keep_alive, + awaiting_ack, + subtopics, + awaiting_rel}). + -record(mqtt_frame, {fixed, variable, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index daa322fa7..500c66d6b 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -43,24 +43,6 @@ -include("emqtt_frame.hrl"). --define(CLIENT_ID_MAXLEN, 23). - --record(state, {socket, - conn_name, - await_recv, - connection_state, - conserve, - parse_state, - message_id, - client_id, - clean_sess, - will_msg, - keep_alive, - awaiting_ack, - subtopics, - awaiting_rel}). - - -define(FRAME_TYPE(Frame, Type), Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). @@ -233,140 +215,6 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, {err, Reason, State} end. -process_request(?CONNECT, - #mqtt_frame{ variable = #mqtt_frame_connect{ - username = Username, - password = Password, - proto_ver = ProtoVersion, - clean_sess = CleanSess, - keep_alive = AlivePeriod, - client_id = ClientId } = Var}, #state{socket = Sock} = State) -> - {ReturnCode, State1} = - case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), - valid_client_id(ClientId)} of - {false, _} -> - {?CONNACK_PROTO_VER, State}; - {_, false} -> - {?CONNACK_INVALID_ID, State}; - _ -> - case emqtt_auth:check(Username, Password) of - false -> - ?ERROR_MSG("MQTT login failed - no credentials"), - {?CONNACK_CREDENTIALS, State}; - true -> - ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), - emqtt_cm:create(ClientId, self()), - KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), - {?CONNACK_ACCEPT, - State #state{ will_msg = make_will_msg(Var), - client_id = ClientId, - keep_alive = KeepAlive}} - end - end, - ?INFO("recv conn...:~p", [ReturnCode]), - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, - variable = #mqtt_frame_connack{ - return_code = ReturnCode }}), - {ok, State1}; - -process_request(?PUBLISH, Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> - emqtt_pubsub:publish(make_msg(Frame)), - {ok, State}; - -process_request(?PUBLISH, - Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_1}, - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - emqtt_pubsub:publish(make_msg(Frame)), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -process_request(?PUBLISH, - Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_2}, - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - emqtt_pubsub:publish(make_msg(Frame)), - put({msg, MsgId}, pubrec), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - - {ok, State}; - -process_request(?PUBACK, #mqtt_frame{}, State) -> - %TODO: fixme later - {ok, State}; - -process_request(?PUBREC, #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - %TODO: fixme later - send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -process_request(?PUBREL, - #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - erase({msg, MsgId}), - send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -process_request(?PUBCOMP, #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> - %TODO: fixme later - {ok, State}; - -process_request(?SUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{message_id = MessageId, - topic_table = Topics}, - payload = undefined}, - #state{socket=Sock} = State) -> - - [emqtt_pubsub:subscribe({Name, Qos}, self()) || - #mqtt_topic{name=Name, qos=Qos} <- Topics], - - GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], - - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = GrantedQos}}), - - {ok, State}; - -process_request(?UNSUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{message_id = MessageId, - topic_table = Topics }, - payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> - - - [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, - variable = #mqtt_frame_suback{message_id = MessageId }}), - - {ok, State}; - -process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> - %Keep alive timer - KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), - {ok, State#state{keep_alive=KeepAlive1}}; - -process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> - ?INFO("~s disconnected", [ClientId]), - {stop, State}. - next_msg_id(State = #state{ message_id = 16#ffff }) -> State #state{ message_id = 1 }; next_msg_id(State = #state{ message_id = MsgId }) -> diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 468b70a22..66af58d4f 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -1,5 +1,167 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ -module(emqtt_protocol). +-include("emqtt.hrl"). + +-include("emqtt_log.hrl"). + -include("emqtt_frame.hrl"). +-export([process_request/3]). + +process_request(?CONNECT, + #mqtt_frame{ variable = #mqtt_frame_connect{ + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + keep_alive = AlivePeriod, + client_id = ClientId } = Var}, #state{socket = Sock} = State) -> + {ReturnCode, State1} = + case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), + valid_client_id(ClientId)} of + {false, _} -> + {?CONNACK_PROTO_VER, State}; + {_, false} -> + {?CONNACK_INVALID_ID, State}; + _ -> + case emqtt_auth:check(Username, Password) of + false -> + ?ERROR_MSG("MQTT login failed - no credentials"), + {?CONNACK_CREDENTIALS, State}; + true -> + ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), + emqtt_cm:create(ClientId, self()), + KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), + {?CONNACK_ACCEPT, + State #state{ will_msg = make_will_msg(Var), + client_id = ClientId, + keep_alive = KeepAlive}} + end + end, + ?INFO("recv conn...:~p", [ReturnCode]), + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, + variable = #mqtt_frame_connack{ + return_code = ReturnCode }}), + {ok, State1}; + +process_request(?PUBLISH, Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> + emqtt_pubsub:publish(make_msg(Frame)), + {ok, State}; + +process_request(?PUBLISH, + Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_1}, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + emqtt_pubsub:publish(make_msg(Frame)), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?PUBLISH, + Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_2}, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + emqtt_pubsub:publish(make_msg(Frame)), + put({msg, MsgId}, pubrec), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + + {ok, State}; + +process_request(?PUBACK, #mqtt_frame{}, State) -> + %TODO: fixme later + {ok, State}; + +process_request(?PUBREC, #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + %TODO: fixme later + send_frame(Sock, + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?PUBREL, + #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + erase({msg, MsgId}), + send_frame(Sock, + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?PUBCOMP, #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> + %TODO: fixme later + {ok, State}; + +process_request(?SUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{message_id = MessageId, + topic_table = Topics}, + payload = undefined}, + #state{socket=Sock} = State) -> + + [emqtt_pubsub:subscribe({Name, Qos}, self()) || + #mqtt_topic{name=Name, qos=Qos} <- Topics], + + GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], + + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = GrantedQos}}), + + {ok, State}; + +process_request(?UNSUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{message_id = MessageId, + topic_table = Topics }, + payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> + + + [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], + + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, + variable = #mqtt_frame_suback{message_id = MessageId }}), + + {ok, State}; + +process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> + %Keep alive timer + KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), + {ok, State#state{keep_alive=KeepAlive1}}; + +process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> + ?INFO("~s disconnected", [ClientId]), + {stop, State}. + +