From 8bbce8d0b7981970adebe881633032bb4c8f72fe Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 10 Jan 2015 23:20:55 +0800 Subject: [PATCH] dump --- apps/emqtt/include/emqtt_packet.hrl | 1 + apps/emqtt/src/emqtt_client.erl | 2 +- apps/emqtt/src/emqtt_packet.erl | 78 ++++++++++++++++++ apps/emqtt/src/emqtt_protocol.erl | 120 ++++++++++++++-------------- 4 files changed, 141 insertions(+), 60 deletions(-) diff --git a/apps/emqtt/include/emqtt_packet.hrl b/apps/emqtt/include/emqtt_packet.hrl index 1f1ea2f3c..4dbc2c007 100644 --- a/apps/emqtt/include/emqtt_packet.hrl +++ b/apps/emqtt/include/emqtt_packet.hrl @@ -84,6 +84,7 @@ %%------------------------------------------------------------------------------ -record(mqtt_packet_connect, { proto_ver, + proto_name, will_retain, will_qos, will_flag, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 7561c1915..a5ceba6f6 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -75,7 +75,7 @@ handle_call({go, Sock}, _From, #state{socket = Sock}) -> conn_state = running, conserve = false, parse_state = emqtt_packet:initial_state(), - proto_state = emqtt_protocol:initial_state(Sock)}), 10000}; + proto_state = emqtt_protocol:initial_state(Sock, Peername)}), 10000}; handle_call(info, _From, State = #state{ conn_name=ConnName, proto_state = ProtoState}) -> diff --git a/apps/emqtt/src/emqtt_packet.erl b/apps/emqtt/src/emqtt_packet.erl index 820cf50b2..d9135843a 100644 --- a/apps/emqtt/src/emqtt_packet.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -31,6 +31,8 @@ -export([parse/2, serialise/1]). +-export([dump/1]). + -define(MAX_LEN, 16#fffffff). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). @@ -86,6 +88,7 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type, wrap(Header, #mqtt_packet_connect{ proto_ver = ProtoVersion, + proto_name = ProtoName, will_retain = bool(WillRetain), will_qos = WillQos, will_flag = bool(WillFlag), @@ -255,3 +258,78 @@ opt(X) when is_integer(X) -> X. protocol_name_approved(Ver, Name) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). +dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) when + Payload =:= undefined orelse Payload =:= <<>> -> + dump_header(Header, dump_variable(Variable)); + +dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> + dump_header(Header, dump_variable(Variable, Payload)). + +dump_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Retain}, S) -> + io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s, ~s)", [dump_type(Type), QoS, Retain, Dup, S]). + +dump_variable( #mqtt_packet_connect { + proto_ver = ProtoVer, + proto_name = ProtoName, + will_retain = WillRetain, + will_qos = WillQoS, + will_flag = WillFlag, + clean_sess = CleanSess, + keep_alive = KeepAlive, + client_id = ClientId, + will_topic = WillTopic, + will_msg = WillMsg, + username = Username, + password = Password} ) -> + io_lib:format("ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s", + [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, Password]); %%TODO: Will + +dump_variable( #mqtt_packet_connack { + ack_flags = AckFlags, + return_code = ReturnCode } ) -> + io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); + +dump_variable( #mqtt_packet_publish { + topic_name = TopicName, + packet_id = PacketId} ) -> + io_lib:format("TopicName=~s, PacketId=~p", [TopicName, PacketId]); + +dump_variable( #mqtt_packet_puback { + packet_id = PacketId } ) -> + io_lib:format("PacketId=~p", [PacketId]); + +dump_variable( #mqtt_packet_subscribe { + packet_id = PacketId, + topic_table = TopicTable }) -> + L = [{Name, QoS} || #mqtt_topic{name = Name, qos = QoS} <- TopicTable], + io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, L]); + +dump_variable( #mqtt_packet_suback { + packet_id = PacketId, + qos_table = QosTable} ) -> + io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]); + +dump_variable(PacketId) when is_integer(PacketId) -> + io_lib:format("PacketId=~p", [PacketId]); + +%%TODO: not right... +dump_variable(undefined) -> "". + +dump_variable(Variable, Payload) -> + dump_variable(Variable). + +dump_type(?CONNECT) -> "CONNECT"; +dump_type(?CONNACK) -> "CONNACK"; +dump_type(?PUBLISH) -> "PUBLISH"; +dump_type(?PUBACK) -> "PUBACK"; +dump_type(?PUBREC) -> "PUBREC"; +dump_type(?PUBREL) -> "PUBREL"; +dump_type(?PUBCOMP) -> "PUBCOMP"; +dump_type(?SUBSCRIBE) -> "SUBSCRIBE"; +dump_type(?SUBACK) -> "SUBACK"; +dump_type(?UNSUBSCRIBE) -> "UNSUBSCRIBE"; +dump_type(?UNSUBACK) -> "UNSUBACK"; +dump_type(?PINGREQ) -> "PINGREQ"; +dump_type(?PINGRESP) -> "PINGRESP"; +dump_type(?DISCONNECT) -> "DISCONNECT". + diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index f7c78a1cc..88a3ff7f6 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -26,9 +26,12 @@ -include("emqtt_packet.hrl"). --record(proto_state, { - socket, - connected = false, %received CONNECT action? +-record(proto_state, { + socket, + peer_name, + connected = false, %received CONNECT action? + proto_vsn, + proto_name, packet_id, client_id, clean_sess, @@ -40,7 +43,7 @@ -type proto_state() :: #proto_state{}. --export([initial_state/1]). +-export([initial_state/2]). -export([handle_packet/2, send_packet/2, client_terminated/1]). @@ -49,24 +52,29 @@ -define(PACKET_TYPE(Packet, Type), Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}). -initial_state(Socket) -> +initial_state(Socket, Peername) -> #proto_state{ socket = Socket, + peer_name = Peername, packet_id = 1, subscriptions = [], awaiting_ack = gb_trees:empty(), awaiting_rel = gb_trees:empty() }. -info(#proto_state{ packet_id = PacketId, +info(#proto_state{ proto_vsn = ProtoVsn, + proto_name = ProtoName, + packet_id = PacketId, client_id = ClientId, clean_sess = CleanSess, will_msg = WillMsg, subscriptions= Subs }) -> [ {packet_id, PacketId}, - {client_id, ClientId}, - {clean_sess, CleanSess}, - {will_msg, WillMsg}, + {proto_vsn, ProtoVsn}, + {proto_name, ProtoName}, + {client_id, ClientId}, + {clean_sess, CleanSess}, + {will_msg, WillMsg}, {subscriptions, Subs} ]. -spec handle_packet(Packet, State) -> {ok, NewState} | {error, any()} when @@ -80,7 +88,7 @@ info(#proto_state{ packet_id = PacketId, handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = false}) -> handle_packet(?CONNECT, Packet, State#proto_state{connected = true}); -handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = true}) -> +handle_packet(?PACKET_TYPE(_Packet, ?CONNECT), State = #proto_state{connected = true}) -> {error, protocol_bad_connect, State}; %%Received other packets when CONNECT not arrived. @@ -88,8 +96,8 @@ handle_packet(_Packet, State = #proto_state{connected = false}) -> {error, protocol_not_connected, State}; handle_packet(?PACKET_TYPE(Packet, Type), - State = #proto_state{client_id = ClientId}) -> - lager:info("packet from ~s: ~p", [ClientId, Packet]), + State = #proto_state { peer_name = PeerName, client_id = ClientId }) -> + lager:info("RECV from ~s/~s: ~s", [PeerName, ClientId, emqtt_packet:dump(Packet)]), case validate_packet(Type, Packet) of ok -> handle_packet(Type, Packet, State); @@ -97,29 +105,28 @@ handle_packet(?PACKET_TYPE(Packet, Type), {error, Reason, State} end. -handle_packet(?CONNECT, #mqtt_packet { - variable = #mqtt_packet_connect { +handle_packet(?CONNECT, Packet = #mqtt_packet { + variable = #mqtt_packet_connect { username = Username, password = Password, proto_ver = ProtoVersion, clean_sess = CleanSess, keep_alive = KeepAlive, client_id = ClientId } = Var }, - State0 = #proto_state{socket = Sock}) -> - - State = State0#proto_state{client_id = ClientId}, + State = #proto_state{ peer_name = PeerName} ) -> + lager:info("RECV from ~s/~s: ~s", [PeerName, ClientId, emqtt_packet:dump(Packet)]), {ReturnCode, State1} = case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), valid_client_id(ClientId)} of {false, _} -> - {?CONNACK_PROTO_VER, State}; + {?CONNACK_PROTO_VER, State#proto_state{client_id = ClientId}}; {_, false} -> - {?CONNACK_INVALID_ID, State}; + {?CONNACK_INVALID_ID, State#proto_state{client_id = ClientId}}; _ -> case emqtt_auth:check(Username, Password) of false -> lager:error("MQTT login failed - no credentials"), - {?CONNACK_CREDENTIALS, State}; + {?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}}; true -> lager:info("connect from clientid: ~s, keepalive: ~p", [ClientId, KeepAlive]), start_keepalive(KeepAlive), @@ -129,10 +136,9 @@ handle_packet(?CONNECT, #mqtt_packet { client_id = ClientId }} end end, - lager:info("[SENT] MQTT CONNACK: ~p", [ReturnCode]), - send_packet(Sock, #mqtt_packet { - header = #mqtt_packet_header { type = ?CONNACK }, - variable = #mqtt_packet_connack{ return_code = ReturnCode }}), + send_packet( #mqtt_packet { + header = #mqtt_packet_header { type = ?CONNACK }, + variable = #mqtt_packet_connack{ return_code = ReturnCode }}, State ), {ok, State1}; handle_packet(?PUBLISH, Packet = #mqtt_packet { @@ -143,21 +149,19 @@ handle_packet(?PUBLISH, Packet = #mqtt_packet { handle_packet(?PUBLISH, Packet = #mqtt_packet { header = #mqtt_packet_header { qos = ?QOS_1 }, variable = #mqtt_packet_publish{packet_id = PacketId}}, - State=#proto_state{socket=Sock}) -> + State) -> emqtt_router:route(make_message(Packet)), - send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBACK }, - variable = #mqtt_packet_puback { packet_id = PacketId}}), + send_packet( make_packet(?PUBACK, PacketId), State ), {ok, State}; handle_packet(?PUBLISH, Packet = #mqtt_packet { header = #mqtt_packet_header { qos = ?QOS_2 }, variable = #mqtt_packet_publish{packet_id = PacketId}}, - State=#proto_state{socket=Sock}) -> + State) -> %%FIXME: this is not right...should store it first... emqtt_router:route(make_message(Packet)), put({msg, PacketId}, pubrec), - send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBREC }, - variable = #mqtt_packet_puback { packet_id = PacketId}}), + send_packet( make_packet(?PUBREC, PacketId), State ), {ok, State}; handle_packet(?PUBACK, #mqtt_packet {}, State) -> @@ -165,25 +169,20 @@ handle_packet(?PUBACK, #mqtt_packet {}, State) -> {ok, State}; handle_packet(?PUBREC, #mqtt_packet { - variable = #mqtt_packet_puback { packet_id = PktId }}, - State=#proto_state{socket=Sock}) -> + variable = #mqtt_packet_puback { packet_id = PacketId }}, + State) -> %FIXME Later: should release the message here - send_packet(Sock, #mqtt_packet { - header = #mqtt_packet_header { type = ?PUBREL}, - variable = #mqtt_packet_puback { packet_id = PktId}}), + send_packet( make_packet(?PUBREL, PacketId), State ), {ok, State}; -handle_packet(?PUBREL, #mqtt_packet { - variable = #mqtt_packet_puback { packet_id = PktId}}, - State=#proto_state{socket=Sock}) -> +handle_packet(?PUBREL, #mqtt_packet { variable = #mqtt_packet_puback { packet_id = PacketId}}, State) -> %%FIXME: not right... - erase({msg, PktId}), - send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?PUBCOMP}, - variable = #mqtt_packet_puback { packet_id = PktId}}), + erase({msg, PacketId}), + send_packet( make_packet(?PUBCOMP, PacketId), State ), {ok, State}; handle_packet(?PUBCOMP, #mqtt_packet { - variable = #mqtt_packet_puback{packet_id = _PktId}}, State) -> + variable = #mqtt_packet_puback{packet_id = _PacketId}}, State) -> %TODO: fixme later {ok, State}; @@ -192,7 +191,7 @@ handle_packet(?SUBSCRIBE, #mqtt_packet { packet_id = PacketId, topic_table = Topics}, payload = undefined}, - State = #proto_state{socket=Sock}) -> + State) -> %%FIXME: this is not right... [emqtt_pubsub:subscribe({Name, Qos}, self()) || @@ -200,10 +199,10 @@ handle_packet(?SUBSCRIBE, #mqtt_packet { GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], - send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK }, - variable = #mqtt_packet_suback{ - packet_id = PacketId, - qos_table = GrantedQos }}), + send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK }, + variable = #mqtt_packet_suback{ + packet_id = PacketId, + qos_table = GrantedQos }}, State), {ok, State}; @@ -212,27 +211,30 @@ handle_packet(?UNSUBSCRIBE, #mqtt_packet { packet_id = PacketId, topic_table = Topics }, payload = undefined}, - State = #proto_state{socket = Sock, client_id = ClientId}) -> + State = #proto_state{client_id = ClientId}) -> [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK }, - variable = #mqtt_packet_suback{packet_id = PacketId }}), + send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK }, + variable = #mqtt_packet_suback{packet_id = PacketId }}, State), {ok, State}; -handle_packet(?PINGREQ, #mqtt_packet{}, #proto_state{socket=Sock}=State) -> - send_packet(Sock, make_packet(?PINGRESP)), +handle_packet(?PINGREQ, #mqtt_packet{}, State) -> + send_packet(make_packet(?PINGRESP), State), {ok, State}; -handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{client_id=ClientId}) -> - lager:info("~s disconnected", [ClientId]), +handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{peer_name = PeerName, client_id = ClientId}) -> + lager:info("RECV from ~s/~s: DISCONNECT", [PeerName, ClientId]), {stop, State}. - make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT -> #mqtt_packet{ header = #mqtt_packet_header { type = Type } }. +make_packet(PubAck, PacketId) when PubAck >= ?PUBACK andalso PubAck =< ?PUBREC -> + #mqtt_packet { header = #mqtt_packet_header { type = PubAck}, + variable = #mqtt_packet_puback { packet_id = PacketId}}. + -spec send_message(Message, State) -> {ok, NewState} when Message :: mqtt_message(), State :: proto_state(), @@ -244,7 +246,7 @@ send_message(Message = #mqtt_message{ topic = Topic, dup = Dup, payload = Payload}, - State = #proto_state{socket = Sock, packet_id = PacketId}) -> + State = #proto_state{packet_id = PacketId}) -> Packet = #mqtt_packet { header = #mqtt_packet_header { @@ -260,7 +262,7 @@ send_message(Message = #mqtt_message{ end }, payload = Payload}, - send_packet(Sock, Packet), + send_packet(Packet, State), if Qos == ?QOS_0 -> {ok, State}; @@ -268,8 +270,8 @@ send_message(Message = #mqtt_message{ {ok, next_packet_id(State)} end. -send_packet(Sock, Packet) -> - lager:info("send packet:~p", [Packet]), +send_packet(Packet, #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> + lager:info("SENT to ~s/~s: ~p", [PeerName, ClientId, emqtt_packet:dump(Packet)]), %%FIXME Later... erlang:port_command(Sock, emqtt_packet:serialise(Packet)).