From fcb5ac6df33842f7c48b7c756f82e843c066d614 Mon Sep 17 00:00:00 2001 From: erylee Date: Fri, 21 Dec 2012 11:30:35 +0800 Subject: [PATCH] add mqtt frame --- src/emqtt_client.erl | 135 +++++++++++++++++++++-- src/emqtt_networking.erl | 4 + src/emqtt_processor.erl | 228 +++++++++++++++++++++++++++++++++++++++ src/emqtt_util.erl | 36 +++++++ 4 files changed, 395 insertions(+), 8 deletions(-) create mode 100644 src/emqtt_processor.erl create mode 100644 src/emqtt_util.erl diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 6629914ad..390872c4c 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -13,27 +13,76 @@ -include("emqtt.hrl"). -go(Pid, Sock) -> - gen_server2:call(Pid, {go, Sock}). +-define(CLIENT_ID_MAXLEN, 23). + +-record(state, { socket, + conn_name, + await_recv, + connection_state, + conserve, + parse_state, + proc_state }). + +-record(proc_state, { socket, + subscriptions, + consumer_tags, + unacked_pubs, + awaiting_ack, + awaiting_seqno, + message_id, + client_id, + clean_sess, + will_msg, + channels, + connection, + exchange }). + start_link() -> gen_server2:start_link(?MODULE, [], []). +go(Pid, Sock) -> + gen_server2:call(Pid, {go, Sock}). + init([]) -> {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}. -handle_call({go, Sock}, _From, State) -> +handle_call({go, Sock}, _From, _State) -> process_flag(trap_exit, true), ok = throw_on_error( inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), error_logger:info_msg("accepting MQTT connection (~s)~n", [ConnStr]), - %inet:setopts(Sock, [{active, once}]), - {reply, ok, State}. + control_throttle( + #state{ socket = Sock, + conn_name = ConnStr, + await_recv = false, + connection_state = running, + conserve = false, + parse_state = emqtt_frame:initial_state(), + proc_state = emqtt_processor:initial_state(Sock) }). handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. +handle_info({route, Msg}, State) -> + emqtt_processor:send_client(Msg), + {noreply, State}; + +handle_info({inet_reply, _Ref, ok}, State) -> + {noreply, State, hibernate}; + +handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock }=State) -> + process_received_bytes( + Data, control_throttle(State #state{ await_recv = false })); + +handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> + network_error(Reason, State); + +handle_info({inet_reply, _Sock, {error, Reason}}, State) -> + error_logger:info_msg("sock error: ~p~n", [Reason]), + {noreply, State}; + handle_info(Info, State) -> {stop, {badinfo, Info}, State}. @@ -45,9 +94,79 @@ code_change(_OldVsn, State, _Extra) -> throw_on_error(E, Thunk) -> case Thunk() of - {error, Reason} -> throw({E, Reason}); - {ok, Res} -> Res; - Res -> Res + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res end. +async_recv(Sock, Length, infinity) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, -1); + +async_recv(Sock, Length, Timeout) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, Timeout). + +process_received_bytes(<<>>, State) -> + {noreply, State}; +process_received_bytes(Bytes, + State = #state{ parse_state = ParseState, + proc_state = ProcState, + conn_name = ConnStr }) -> + case + emqtt_frame:parse(Bytes, ParseState) of + {more, ParseState1} -> + {noreply, + control_throttle( State #state{ parse_state = ParseState1 }), + hibernate}; + {ok, Frame, Rest} -> + case emqtt_processor:process_frame(Frame, ProcState) of + {ok, ProcState1} -> + PS = emqtt_frame:initial_state(), + process_received_bytes( + Rest, + State #state{ parse_state = PS, + proc_state = ProcState1 }); + {err, Reason, ProcState1} -> + error_logger:info_msg("MQTT protocol error ~p for connection ~p~n", + [Reason, ConnStr]), + stop({shutdown, Reason}, pstate(State, ProcState1)); + {stop, ProcState1} -> + stop(normal, pstate(State, ProcState1)) + end; + {error, Error} -> + error_logger:erro_msg("MQTT detected framing error ~p for connection ~p~n", + [ConnStr, Error]), + stop({shutdown, Error}, State) + end. + +pstate(State = #state {}, PState = #proc_state{}) -> + State #state{ proc_state = PState }. + +%%---------------------------------------------------------------------------- +network_error(_Reason, + State = #state{ conn_name = ConnStr, + proc_state = PState }) -> + error_logger:info_msg("MQTT detected network error for ~p~n", [ConnStr]), + emqtt_processor:send_will(PState), + % todo: flush channel after publish + stop({shutdown, conn_closed}, State). + +run_socket(State = #state{ connection_state = blocked }) -> + State; +run_socket(State = #state{ await_recv = true }) -> + State; +run_socket(State = #state{ socket = Sock }) -> + async_recv(Sock, 0, infinity), + State#state{ await_recv = true }. + +control_throttle(State = #state{ connection_state = Flow, + conserve = Conserve }) -> + case {Flow, Conserve orelse credit_flow:blocked()} of + {running, true} -> State #state{ connection_state = blocked }; + {blocked, false} -> run_socket(State #state{ + connection_state = running }); + {_, _} -> run_socket(State) + end. + +stop(Reason, State ) -> + {stop, Reason, State}. diff --git a/src/emqtt_networking.erl b/src/emqtt_networking.erl index a8493f9c0..2c8998dfe 100644 --- a/src/emqtt_networking.erl +++ b/src/emqtt_networking.erl @@ -89,6 +89,10 @@ start_client(Sock) -> {ok, Client} = supervisor:start_child(emqtt_client_sup, []), ok = gen_tcp:controlling_process(Sock, Client), emqtt_client:go(Client, Sock), + + %% see comment in rabbit_networking:start_client/2 + gen_event:which_handlers(error_logger), + Client. %%-------------------------------------------------------------------- diff --git a/src/emqtt_processor.erl b/src/emqtt_processor.erl new file mode 100644 index 000000000..798ac6ca3 --- /dev/null +++ b/src/emqtt_processor.erl @@ -0,0 +1,228 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +%%Don't send amqp message + +-module(emqtt_processor). + +-export([info/2, initial_state/1, + process_frame/2, send_will/1]). + +-include("emqtt.hrl"). +-include("emqtt_frame.hrl"). + +-record(proc_state, { socket, + subscriptions, + consumer_tags, + unacked_pubs, + awaiting_ack, + awaiting_seqno, + message_id, + client_id, + clean_sess, + will_msg, + channels, + connection, + exchange }). + +-define(FRAME_TYPE(Frame, Type), + Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). + +initial_state(Socket) -> + #proc_state{ unacked_pubs = gb_trees:empty(), + awaiting_ack = gb_trees:empty(), + message_id = 1, + subscriptions = dict:new(), + consumer_tags = {undefined, undefined}, + channels = {undefined, undefined}, + exchange = emqtt_util:env(exchange), + socket = Socket }. + +info(client_id, #proc_state{ client_id = ClientId }) -> ClientId. + +process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + PState = #proc_state{ connection = undefined } ) + when Type =/= ?CONNECT -> + {err, connect_expected, PState}; + +process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + PState ) -> + process_request(Type, Frame, PState). + +process_request(?CONNECT, + #mqtt_frame{ variable = #mqtt_frame_connect{ + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + client_id = ClientId } = Var}, PState) -> + {ReturnCode, PState1} = + case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, + emqtt_util:valid_client_id(ClientId)} of + {false, _} -> + {?CONNACK_PROTO_VER, PState}; + {_, false} -> + {?CONNACK_INVALID_ID, PState}; + _ -> + case creds(Username, Password) of + nocreds -> + error_logger:error_msg("MQTT login failed - no credentials~n"), + {?CONNACK_CREDENTIALS, PState}; + {UserBin, PassBin} -> + {?CONNACK_ACCEPT, + PState #proc_state{ will_msg = make_will_msg(Var), + client_id = ClientId }} + end + end, + send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, + variable = #mqtt_frame_connack{ + return_code = ReturnCode }}, PState1), + {ok, PState1}; + +process_request(?PUBACK, + #mqtt_frame{ + variable = #mqtt_frame_publish{ message_id = MessageId }}, + #proc_state{awaiting_ack = Awaiting } = PState) -> + {ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}}; + +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) -> + {err, qos2_not_supported, PState}; + +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = Qos, + retain = Retain, + dup = Dup }, + variable = #mqtt_frame_publish{ topic_name = Topic, + message_id = MessageId }, + payload = Payload }, PState) -> + Msg = #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + message_id = MessageId, + payload = Payload }, + emqtt_router:route(Msg), + + send_client( + #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MessageId }}, + PState), + {ok, PState}; + + +process_request(?SUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, + #proc_state{} = PState0) -> + QosResponse = + lists:foldl(fun (#mqtt_topic{ name = TopicName, + qos = Qos }, QosList) -> + SupportedQos = supported_subs_qos(Qos), + [SupportedQos | QosList] + end, [], Topics), + + [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], + + [emqtt_router:insert(#subscriber{topic=Name, pid=self()}) + || #mqtt_topic{name=Name} <- Topics], + + send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = QosResponse }}, PState0), + + {ok, PState0}; + +process_request(?UNSUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, #proc_state{ client_id = ClientId, + subscriptions = Subs0} = PState) -> + + + [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) + || #mqtt_topic{name=Name} <- Topics], + + send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, + variable = #mqtt_frame_suback{ message_id = MessageId }}, + PState), + + {ok, PState #proc_state{ subscriptions = Subs0 }}; + +process_request(?PINGREQ, #mqtt_frame{}, PState) -> + send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, + PState), + {ok, PState}; + +process_request(?DISCONNECT, #mqtt_frame{}, PState) -> + {stop, PState}. + + + +next_msg_id(PState = #proc_state{ message_id = 16#ffff }) -> + PState #proc_state{ message_id = 1 }; +next_msg_id(PState = #proc_state{ message_id = MsgId }) -> + PState #proc_state{ message_id = MsgId + 1 }. + +%% decide at which qos level to deliver based on subscription +%% and the message publish qos level. non-MQTT publishes are +%% assumed to be qos 1, regardless of delivery_mode. +delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) -> + {?QOS_0, ?QOS_0}; +delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) -> + case emqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of + {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1}; + undefined -> {?QOS_1, ?QOS_1} + end. + +maybe_clean_sess(false, _Conn, _ClientId) -> + % todo: establish subscription to deliver old unacknowledged messages + ok. + +%%---------------------------------------------------------------------------- + +make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> + undefined; +make_will_msg(#mqtt_frame_connect{ will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_msg = Msg }) -> + #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. + +creds(User, Pass) -> + {User, Pass}. + +supported_subs_qos(?QOS_0) -> ?QOS_0; +supported_subs_qos(?QOS_1) -> ?QOS_1; +supported_subs_qos(?QOS_2) -> ?QOS_1. + +send_will(PState = #proc_state{ will_msg = WillMsg }) -> + error_logger:info_msg("willmsg: ~p~n", [WillMsg]). + + +send_client(Frame, #proc_state{ socket = Sock }) -> + erlang:port_command(Sock, emqtt_frame:serialise(Frame)). + diff --git a/src/emqtt_util.erl b/src/emqtt_util.erl new file mode 100644 index 000000000..378eb22c0 --- /dev/null +++ b/src/emqtt_util.erl @@ -0,0 +1,36 @@ +-module(emqtt_util). + +-include("emqtt.hrl"). + +-define(CLIENT_ID_MAXLEN, 23). + +-compile(export_all). + +subcription_queue_name(ClientId) -> + Base = "mqtt-subscription-" ++ ClientId ++ "qos", + {list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}. + +%% amqp mqtt descr +%% * + match one topic level +%% # # match multiple topic levels +%% . / topic level separator +mqtt2amqp(Topic) -> + erlang:iolist_to_binary( + re:replace(re:replace(Topic, "/", ".", [global]), + "[\+]", "*", [global])). + +amqp2mqtt(Topic) -> + erlang:iolist_to_binary( + re:replace(re:replace(Topic, "[\*]", "+", [global]), + "[\.]", "/", [global])). + +valid_client_id(ClientId) -> + ClientIdLen = length(ClientId), + 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. + +env(Key) -> + case application:get_env(emqtt, Key) of + {ok, Val} -> Val; + undefined -> undefined + end. +