diff --git a/apps/emqtt/src/emqtt_app.erl b/apps/emqtt/src/emqtt_app.erl index ff3400a56..abf59a323 100644 --- a/apps/emqtt/src/emqtt_app.erl +++ b/apps/emqtt/src/emqtt_app.erl @@ -73,7 +73,9 @@ start_servers(Sup) -> start_child(Sup, Server, Opts), ?PRINT_MSG("[done]~n") end, - [{"emqtt cm", emqtt_cm}, + [{"emqtt config", emqtt_config}, + {"emqtt client manager", emqtt_cm}, + {"emqtt session manager", emqtt_sm}, {"emqtt auth", emqtt_auth}, {"emqtt retained", emqtt_retained}, {"emqtt pubsub", emqtt_pubsub}, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index c74d1c606..a00a52e85 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -103,7 +103,8 @@ handle_info({dispatch, Message}, #state{proto_state = ProtoState} = State) -> handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; -handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) -> +handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ peer_name = PeerName, socket = Sock } = State) -> + lager:debug("RECV from ~s: ~p", [State#state.peer_name, Data]), process_received_bytes( Data, control_throttle(State #state{ await_recv = false })); diff --git a/apps/emqtt/src/emqtt_config.erl b/apps/emqtt/src/emqtt_config.erl new file mode 100644 index 000000000..0a18e922f --- /dev/null +++ b/apps/emqtt/src/emqtt_config.erl @@ -0,0 +1,82 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_config). + +-export([lookup/1]). + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +-export([start_link/0]). + + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%TODO: fix later... +lookup(Key) -> {ok, Key}. + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(_Args) -> + ets:new(?MODULE, [set, protected, named_table]), + %%TODO: Load application config. + {ok, none}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index b794bb9fe..51c255990 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -270,8 +270,10 @@ send_message(Message = #mqtt_message{ send_packet(Packet, #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]), + Data = emqtt_packet:serialise(Packet), + lager:debug("SENT to ~s: ~p", [PeerName, Data]), %%FIXME Later... - erlang:port_command(Sock, emqtt_packet:serialise(Packet)). + erlang:port_command(Sock, Data). %%TODO: fix me later... client_terminated(#proto_state{client_id = ClientId} = State) -> diff --git a/apps/emqtt/test/emqtt_packet_tests.erl b/apps/emqtt/test/emqtt_packet_tests.erl index f561a86b4..b9e516c91 100644 --- a/apps/emqtt/test/emqtt_packet_tests.erl +++ b/apps/emqtt/test/emqtt_packet_tests.erl @@ -21,14 +21,58 @@ %%------------------------------------------------------------------------------ -module(emqtt_packet_tests). +-include("emqtt_packet.hrl"). + +-import(emqtt_packet, [initial_state/0, parse/2, serialise/1]). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -decode_test() -> +parse_connect_test() -> + State = initial_state(), + %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) + V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, + %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) + V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, + + ?assertMatch({ok, #mqtt_packet{}, <<>>}, parse(V31ConnBin, State)), + ?assertMatch({ok, #mqtt_packet{}, <<>>}, parse(V311ConnBin, State)), + + %%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg)) + ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>, ok. -encode_test() -> +parse_publish_test() -> + %%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>) + PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>, + %PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>) + %DISCONNECT(Qos=0, Retain=false, Dup=false) + PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>, + ok. + +parse_puback_test() -> + %%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1) + PubAckBin = <<64,2,0,1>>, + ok. + +parse_subscribe_test() -> + ok. + +parse_pingreq_test() -> + ok. + +parse_disconnect_test() -> + %DISCONNECT(Qos=0, Retain=false, Dup=false) + Bin = <<224, 0>>, + ok. + +serialise_connack_test() -> + ConnAck = #mqtt_packet{ header = #mqtt_packet_header { type = ?CONNACK }, + variable = #mqtt_packet_connack { ack_flags = 0, return_code = 0 } }, + ?assertEqual(<<32,2,0,0>>, emqtt_packet:serialise(ConnAck)). + +serialise_puback_test() -> ok. -endif.