diff --git a/CHANGELOG.md b/CHANGELOG.md index d2607c88e..9011e9dc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,21 @@ eMQTT ChangeLog ================== +v0.2.1-beta (2015-01-08) +------------------------ + +pull request 26: Use binaries for topic paths and fix wildcard topics + +emqtt_pubsub.erl: fix wildcard topic match bug caused by binary topic in 0.2.0 + +Makefile: deps -> get-deps + +rebar.config: fix mochiweb git url + +tag emqtt release accoding to [Semantic Versioning](http://semver.org/) + +max clientId length is 1024 now. + 0.2.0 (2014-12-07) ------------------- diff --git a/LICENSE b/LICENSE index fe58b384a..287c4750f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2014, Feng Lee +Copyright (c) 2012-2015, Feng Lee Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Makefile b/Makefile index 72e436459..6adbf3bf0 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ -all: dep compile +all: get-deps compile -compile: dep +compile: get-deps ./rebar compile -dep: +get-deps: ./rebar get-deps clean: diff --git a/README.md b/README.md index 44bedc77d..c4e51fa93 100644 --- a/README.md +++ b/README.md @@ -9,15 +9,15 @@ eMQTT requires Erlang R17+. ## Startup in Five Minutes ``` - $ git clone git://github.com/slimpp/emqtt.git +$ git clone git://github.com/slimpp/emqtt.git - $ cd emqtt +$ cd emqtt - $ make && make dist +$ make && make dist - $ cd rel/emqtt +$ cd rel/emqtt - $ ./bin/emqtt console +$ ./bin/emqtt console ``` ## Deploy and Start @@ -25,18 +25,18 @@ eMQTT requires Erlang R17+. ### start ``` - cp -R rel/emqtt $INSTALL_DIR +cp -R rel/emqtt $INSTALL_DIR - cd $INSTALL_DIR/emqtt +cd $INSTALL_DIR/emqtt - ./bin/emqtt start +./bin/emqtt start ``` ### stop ``` - ./bin/emqtt stop +./bin/emqtt stop ``` @@ -77,7 +77,25 @@ When nodes clustered, vm.args should be configured as below: -name emqtt@host1 ``` -...... +## Cluster + +Suppose we cluster two nodes on 'host1', 'host2', Steps: + +on 'host1': + +``` +./bin/emqtt start +``` + +on 'host2': + +``` +./bin/emqtt start + +./bin/emqtt_ctl cluster emqtt@host1 +``` + +Run './bin/emqtt_ctl cluster' on 'host1' or 'host2' to check cluster nodes. ## Cluster @@ -106,13 +124,13 @@ eMQTT support http to publish message. Example: ``` - curl -v --basic -u user:passwd -d "topic=/a/b/c&message=hello from http..." -k http://localhost:8883/mqtt/publish +curl -v --basic -u user:passwd -d "topic=/a/b/c&message=hello from http..." -k http://localhost:8883/mqtt/publish ``` ### URL ``` - HTTP POST http://host:8883/mqtt/publish +HTTP POST http://host:8883/mqtt/publish ``` ### Parameters @@ -124,7 +142,7 @@ message | Text Message ## Design -[Design Wiki](https://github.com/slimpp/emqtt/wiki) +[Design Wiki](https://github.com/emqtt/emqtt/wiki) ## License @@ -132,5 +150,10 @@ The MIT License (MIT) ## Author -feng at slimchat.io +feng at emqtt.io + +## Thanks + +@hejin1026 (260495915 at qq.com) +@desoulter (assoulter123 at gmail.com) diff --git a/TODO b/TODO index 0b16f9bc4..d3ebc89cd 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,9 @@ + +0.2.2 +===== + +merge pull request#26 + 0.2.0 ===== diff --git a/apps/emqtt/include/emqtt.hrl b/apps/emqtt/include/emqtt.hrl index 375705483..19485c7da 100644 --- a/apps/emqtt/include/emqtt.hrl +++ b/apps/emqtt/include/emqtt.hrl @@ -1,5 +1,5 @@ %%------------------------------------------------------------------------------ -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -20,56 +20,72 @@ %% SOFTWARE. %%------------------------------------------------------------------------------ -%% --------------------------------- -%% banner -%% --------------------------------- --define(COPYRIGHT, "Copyright (C) 2014, Feng Lee"). +%%------------------------------------------------------------------------------ +%% Banner +%%------------------------------------------------------------------------------ +-define(COPYRIGHT, "Copyright (C) 2012-2015, Feng Lee "). -define(LICENSE_MESSAGE, "Licensed under MIT"). --define(PROTOCOL_VERSION, "MQTT/3.1"). +-define(PROTOCOL_VERSION, "MQTT/3.1.1"). -define(ERTS_MINIMUM, "6.0"). %%------------------------------------------------------------------------------ -%% MQTT Qos +%% MQTT QoS %%------------------------------------------------------------------------------ -define(QOS_0, 0). -define(QOS_1, 1). -define(QOS_2, 2). --type qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0. +-type mqtt_qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0. %%------------------------------------------------------------------------------ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - client_id + client_id, + username }). +-type mqtt_client() :: #mqtt_client{}. + +%%------------------------------------------------------------------------------ +%% MQTT Session +%%------------------------------------------------------------------------------ +-record(mqtt_session, { + client_id +}). + +-type mqtt_session() :: #mqtt_session{}. + %%------------------------------------------------------------------------------ %% MQTT Message %%------------------------------------------------------------------------------ --record(mqtt_msg, { - retain, - qos, - topic, - dup, - msgid, - payload, - encoder +-record(mqtt_message, { + qos = ?QOS_0 :: mqtt_qos(), + retain = false :: boolean(), + dup = false :: boolean(), + msgid :: integer(), + topic :: binary(), + payload :: binary() }). --type mqtt_msg() :: #mqtt_msg{}. +-type mqtt_message() :: #mqtt_message{}. %%------------------------------------------------------------------------------ %% MQTT User Management %%------------------------------------------------------------------------------ --record(emqtt_user, { +-record(mqtt_user, { username :: binary(), passwdhash :: binary() }). +%%------------------------------------------------------------------------------ +%% MQTT Authorization +%%------------------------------------------------------------------------------ + +%%TODO: ClientId | Username --> Pub | Sub --> Topics diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl deleted file mode 100644 index e878681cd..000000000 --- a/apps/emqtt/include/emqtt_frame.hrl +++ /dev/null @@ -1,91 +0,0 @@ -% -% NOTICE: copy from rabbitmq mqtt-adaper -% - -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]). - --define(MQTT_PROTO_MAJOR, 3). --define(MQTT_PROTO_MINOR, 1). - --define(CLIENT_ID_MAXLEN, 1024). - -%% frame types - --define(CONNECT, 1). --define(CONNACK, 2). --define(PUBLISH, 3). --define(PUBACK, 4). --define(PUBREC, 5). --define(PUBREL, 6). --define(PUBCOMP, 7). --define(SUBSCRIBE, 8). --define(SUBACK, 9). --define(UNSUBSCRIBE, 10). --define(UNSUBACK, 11). --define(PINGREQ, 12). --define(PINGRESP, 13). --define(DISCONNECT, 14). - -%% connect return codes - --define(CONNACK_ACCEPT, 0). --define(CONNACK_PROTO_VER, 1). %% unacceptable protocol version --define(CONNACK_INVALID_ID, 2). %% identifier rejected --define(CONNACK_SERVER, 3). %% server unavailable --define(CONNACK_CREDENTIALS, 4). %% bad user name or password --define(CONNACK_AUTH, 5). %% not authorized - - --record(mqtt_frame, {fixed, - variable, - payload}). - --record(mqtt_frame_fixed, {type = 0, - dup = 0, - qos = 0, - retain = 0}). - --record(mqtt_frame_connect, {proto_ver, - will_retain, - will_qos, - will_flag, - clean_sess, - keep_alive, - client_id, - will_topic, - will_msg, - username, - password}). - --record(mqtt_frame_connack, {return_code}). - --record(mqtt_frame_publish, {topic_name, - message_id}). - --record(mqtt_frame_subscribe,{message_id, - topic_table}). - --record(mqtt_frame_suback, {message_id, - qos_table = []}). - --record(mqtt_topic, {name, - qos}). - --record(mqtt_frame_other, {other}). - diff --git a/apps/emqtt/include/emqtt_packet.hrl b/apps/emqtt/include/emqtt_packet.hrl new file mode 100644 index 000000000..2bbcebc29 --- /dev/null +++ b/apps/emqtt/include/emqtt_packet.hrl @@ -0,0 +1,134 @@ +%%------------------------------------------------------------------------------ +%% Copyright (c) 2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ +%% +%% The Original Code is from RabbitMQ. +%% + +%%------------------------------------------------------------------------------ +%% MQTT Protocol Version and Levels +%%------------------------------------------------------------------------------ +-define(MQTT_PROTO_V31, 3). +-define(MQTT_PROTO_V311, 4). + +-define(PROTOCOL_NAMES, [{?MQTT_PROTO_V31, <<"MQIsdp">>}, {?MQTT_PROTO_V311, <<"MQTT">>}]). + +-define(MAX_CLIENTID_LEN, 1024). + +%%------------------------------------------------------------------------------ +%% MQTT Control Packet Types +%%------------------------------------------------------------------------------ +-define(RESERVED, 0). %% Reserved +-define(CONNECT, 1). %% Client request to connect to Server +-define(CONNACK, 2). %% Server to Client: Connect acknowledgment +-define(PUBLISH, 3). %% Publish message +-define(PUBACK, 4). %% Publish acknowledgment +-define(PUBREC, 5). %% Publish received (assured delivery part 1) +-define(PUBREL, 6). %% Publish release (assured delivery part 2) +-define(PUBCOMP, 7). %% Publish complete (assured delivery part 3) +-define(SUBSCRIBE, 8). %% Client subscribe request +-define(SUBACK, 9). %% Server Subscribe acknowledgment +-define(UNSUBSCRIBE, 10). %% Unsubscribe request +-define(UNSUBACK, 11). %% Unsubscribe acknowledgment +-define(PINGREQ, 12). %% PING request +-define(PINGRESP, 13). %% PING response +-define(DISCONNECT, 14). %% Client is disconnecting + +%%------------------------------------------------------------------------------ +%% MQTT Connect Return Codes +%%------------------------------------------------------------------------------ +-define(CONNACK_ACCEPT, 0). %% Connection accepted +-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version +-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server +-define(CONNACK_SERVER, 3). %% Server unavailable +-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed +-define(CONNACK_AUTH, 5). %% Client is not authorized to connect + +%%------------------------------------------------------------------------------ +%% MQTT Erlang Types +%%------------------------------------------------------------------------------ +-type mqtt_packet_type() :: ?RESERVED..?DISCONNECT. + +-type mqtt_packet_id() :: 1..16#ffff | undefined. + + +%%------------------------------------------------------------------------------ +%% MQTT Packet Fixed Header +%%------------------------------------------------------------------------------ +-record(mqtt_packet_header, { + type = ?RESERVED :: mqtt_packet_type(), + dup = fasle :: boolean(), + qos = 0 :: 0 | 1 | 2, + retain = false :: boolean() }). + +%%------------------------------------------------------------------------------ +%% MQTT Packets +%%------------------------------------------------------------------------------ +-record(mqtt_packet_connect, { + proto_ver, + will_retain, + will_qos, + will_flag, + clean_sess, + keep_alive, + client_id, + will_topic, + will_msg, + username, + password }). + +-record(mqtt_packet_connack, { + ack_flags = ?RESERVED, + return_code }). + +-record(mqtt_packet_publish, { + topic_name :: binary(), + packet_id :: mqtt_packet_id() }). + +-record(mqtt_packet_puback, { + packet_id :: mqtt_packet_id() }). + +-record(mqtt_topic, { + name :: binary(), + qos :: 0 | 1 | 2 }). + +-record(mqtt_packet_subscribe, { + packet_id :: mqtt_packet_id(), + topic_table :: list(#mqtt_topic{}) }). + +-record(mqtt_packet_suback, { + packet_id :: mqtt_packet_id(), + qos_table = [] }). + +%%------------------------------------------------------------------------------ +%% MQTT Control Packet +%%------------------------------------------------------------------------------ +-record(mqtt_packet, { + header :: #mqtt_packet_header{}, + variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} + | #mqtt_packet_publish{} | #mqtt_packet_puback{} + | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} + | mqtt_packet_id(), + payload :: binary() }). + +-type mqtt_packet() :: #mqtt_packet{}. + + diff --git a/apps/emqtt/include/emqtt_topic.hrl b/apps/emqtt/include/emqtt_topic.hrl index c31ea8556..4cb9e4dda 100644 --- a/apps/emqtt/include/emqtt_topic.hrl +++ b/apps/emqtt/include/emqtt_topic.hrl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -24,15 +24,15 @@ %% Core PubSub Topic %%------------------------------------------------------------------------------ -record(topic, { - name :: binary(), - node :: node() + name :: binary(), + node :: node() }). -type topic() :: #topic{}. -record(topic_subscriber, { topic :: binary(), - qos = 0 :: integer(), + qos = 0 :: non_neg_integer(), subpid :: pid() }). @@ -44,7 +44,7 @@ -record(topic_trie_edge, { node_id :: binary(), - word :: binary() + word :: binary() | char() }). -record(topic_trie, { @@ -52,3 +52,8 @@ node_id :: binary() }). +%%------------------------------------------------------------------------------ +%% System Topic +%%------------------------------------------------------------------------------ +-define(SYSTOP, <<"$SYS">>). + diff --git a/apps/emqtt/src/emqtt.erl b/apps/emqtt/src/emqtt.erl index 0b57307e0..06d4b7d96 100644 --- a/apps/emqtt/src/emqtt.erl +++ b/apps/emqtt/src/emqtt.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal diff --git a/apps/emqtt/src/emqtt_app.erl b/apps/emqtt/src/emqtt_app.erl index 14c3bf29e..ff3400a56 100644 --- a/apps/emqtt/src/emqtt_app.erl +++ b/apps/emqtt/src/emqtt_app.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_app). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -behaviour(application). diff --git a/apps/emqtt/src/emqtt_auth.erl b/apps/emqtt/src/emqtt_auth.erl index ba3202e0a..4045c5c1a 100644 --- a/apps/emqtt/src/emqtt_auth.erl +++ b/apps/emqtt/src/emqtt_auth.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_auth). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -include("emqtt.hrl"). diff --git a/apps/emqtt/src/emqtt_auth_anonymous.erl b/apps/emqtt/src/emqtt_auth_anonymous.erl index a823eeef3..8bd5e4ad6 100644 --- a/apps/emqtt/src/emqtt_auth_anonymous.erl +++ b/apps/emqtt/src/emqtt_auth_anonymous.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_auth_anonymous). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -export([init/1, add/2, diff --git a/apps/emqtt/src/emqtt_auth_internal.erl b/apps/emqtt/src/emqtt_auth_internal.erl index f2dcef1dd..de6df9ab1 100644 --- a/apps/emqtt/src/emqtt_auth_internal.erl +++ b/apps/emqtt/src/emqtt_auth_internal.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_auth_internal). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -include("emqtt.hrl"). @@ -32,10 +32,10 @@ delete/1]). init(_Opts) -> - mnesia:create_table(emqtt_user, [ + mnesia:create_table(mqtt_user, [ {ram_copies, [node()]}, - {attributes, record_info(fields, emqtt_user)}]), - mnesia:add_table_copy(emqtt_user, node(), ram_copies), + {attributes, record_info(fields, mqtt_user)}]), + mnesia:add_table_copy(mqtt_user, node(), ram_copies), ok. check(undefined, _) -> false; @@ -44,19 +44,19 @@ check(_, undefined) -> false; check(Username, Password) when is_binary(Username), is_binary(Password) -> PasswdHash = crypto:hash(md5, Password), - case mnesia:dirty_read(emqtt_user, Username) of - [#emqtt_user{passwdhash=PasswdHash}] -> true; + case mnesia:dirty_read(mqtt_user, Username) of + [#mqtt_user{passwdhash=PasswdHash}] -> true; _ -> false end. add(Username, Password) when is_binary(Username) and is_binary(Password) -> mnesia:dirty_write( - #emqtt_user{ + #mqtt_user{ username=Username, passwdhash=crypto:hash(md5, Password) } ). delete(Username) when is_binary(Username) -> - mnesia:dirty_delete(emqtt_user, Username). + mnesia:dirty_delete(mqtt_user, Username). diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index e3e18d1b6..cc87e8cf2 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_client). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -behaviour(gen_server). @@ -37,14 +37,12 @@ -include("emqtt.hrl"). --include("emqtt_frame.hrl"). - %%Client State... --record(conn_state, { +-record(state, { socket, conn_name, await_recv, - connection_state, + conn_state, conserve, parse_state, proto_state, @@ -62,22 +60,24 @@ go(Pid, Sock) -> init([Sock]) -> io:format("client is created: ~p~n", [self()]), - {ok, #conn_state{socket = Sock}, hibernate}. + {ok, #state{socket = Sock}, hibernate}. -handle_call({go, Sock}, _From, State = #conn_state{socket = Sock}) -> +handle_call({go, Sock}, _From, #state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), io:format("conn from ~s~n", [ConnStr]), {reply, ok, - control_throttle( - #conn_state{ socket = Sock, - conn_name = ConnStr, - await_recv = false, - connection_state = running, - conserve = false, - parse_state = emqtt_frame:initial_state(), - proto_state = emqtt_protocol:initial_state(Sock)})}; + control_throttle( + #state{ socket = Sock, + conn_name = ConnStr, + await_recv = false, + conn_state = running, + conserve = false, + parse_state = emqtt_packet:initial_state(), + proto_state = emqtt_protocol:initial_state(Sock)})}; -handle_call(info, _From, State = #conn_state{conn_name=ConnName, proto_state = ProtoState}) -> +handle_call(info, _From, State = #state{ + conn_name=ConnName, + proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; handle_call(Req, _From, State) -> @@ -89,22 +89,22 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> stop({shutdown, timeout}, State); -handle_info({stop, duplicate_id}, State=#conn_state{conn_name=ConnName}) -> +handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) -> %%TODO: %lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), stop({shutdown, duplicate_id}, State); %%TODO: ok?? -handle_info({dispatch, Msg}, #conn_state{proto_state = ProtoState} = State) -> +handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState), - {noreply, State#conn_state{proto_state = ProtoState1}}; + {noreply, State#state{proto_state = ProtoState1}}; handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; -handle_info({inet_async, Sock, _Ref, {ok, Data}}, #conn_state{ socket = Sock}=State) -> +handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) -> process_received_bytes( - Data, control_throttle(State #conn_state{ await_recv = false })); + Data, control_throttle(State #state{ await_recv = false })); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); @@ -113,28 +113,28 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> {noreply, State}; -handle_info(keep_alive_timeout, #conn_state{keep_alive=KeepAlive}=State) -> +handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> case emqtt_keep_alive:state(KeepAlive) of idle -> - lager:info("keep_alive timeout: ~p", [State#conn_state.conn_name]), + lager:info("keep_alive timeout: ~p", [State#state.conn_name]), {stop, normal, State}; active -> KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - {noreply, State#conn_state{keep_alive=KeepAlive1}} + {noreply, State#state{keep_alive=KeepAlive1}} end; handle_info(Info, State) -> lager:error("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. -terminate(Reason, #conn_state{proto_state = unefined}) -> +terminate(Reason, #state{proto_state = unefined}) -> io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]), %%TODO: fix keep_alive... %%emqtt_keep_alive:cancel(KeepAlive), %emqtt_protocol:client_terminated(ProtoState), ok; -terminate(_Reason, #conn_state{proto_state = ProtoState}) -> +terminate(_Reason, #state{proto_state = ProtoState}) -> %%TODO: fix keep_alive... %%emqtt_keep_alive:cancel(KeepAlive), emqtt_protocol:client_terminated(ProtoState), @@ -156,28 +156,28 @@ process_received_bytes(<<>>, State) -> {noreply, State, hibernate}; process_received_bytes(Bytes, - State = #conn_state{ parse_state = ParseState, + State = #state{ parse_state = ParseState, proto_state = ProtoState, conn_name = ConnStr }) -> - case emqtt_frame:parse(Bytes, ParseState) of + case emqtt_packet:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, - control_throttle( State #conn_state{ parse_state = ParseState1 }), + control_throttle( State #state{ parse_state = ParseState1 }), hibernate}; - {ok, Frame, Rest} -> - case emqtt_protocol:handle_frame(Frame, ProtoState) of + {ok, Packet, Rest} -> + case emqtt_protocol:handle_packet(Packet, ProtoState) of {ok, ProtoState1} -> process_received_bytes( Rest, - State#conn_state{ parse_state = emqtt_frame:initial_state(), + State#state{ parse_state = emqtt_packet:initial_state(), proto_state = ProtoState1 }); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), stop({shutdown, Error}, State); {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#conn_state{proto_state = ProtoState1}); + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); {stop, ProtoState1} -> - stop(normal, State#conn_state{proto_state = ProtoState1}) + stop(normal, State#state{proto_state = ProtoState1}) end; {error, Error} -> lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), @@ -186,27 +186,27 @@ process_received_bytes(Bytes, %%---------------------------------------------------------------------------- network_error(Reason, - State = #conn_state{ conn_name = ConnStr}) -> + State = #state{ conn_name = ConnStr}) -> lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), %%TODO: where to SEND WILL MSG?? %%send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). -run_socket(State = #conn_state{ connection_state = blocked }) -> +run_socket(State = #state{ conn_state = blocked }) -> State; -run_socket(State = #conn_state{ await_recv = true }) -> +run_socket(State = #state{ await_recv = true }) -> State; -run_socket(State = #conn_state{ socket = Sock }) -> +run_socket(State = #state{ socket = Sock }) -> async_recv(Sock, 0, infinity), - State#conn_state{ await_recv = true }. + State#state{ await_recv = true }. -control_throttle(State = #conn_state{ connection_state = Flow, +control_throttle(State = #state{ conn_state = Flow, conserve = Conserve }) -> case {Flow, Conserve} of - {running, true} -> State #conn_state{ connection_state = blocked }; - {blocked, false} -> run_socket(State #conn_state{ - connection_state = running }); + {running, true} -> State #state{ conn_state = blocked }; + {blocked, false} -> run_socket(State #state{ + conn_state = running }); {_, _} -> run_socket(State) end. diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index 0182f71e7..64cd451f4 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -23,7 +23,7 @@ %client manager -module(emqtt_cm). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -behaviour(gen_server). diff --git a/apps/emqtt/src/emqtt_ctl.erl b/apps/emqtt/src/emqtt_ctl.erl index e5f747552..bee4caf5c 100644 --- a/apps/emqtt/src/emqtt_ctl.erl +++ b/apps/emqtt/src/emqtt_ctl.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_ctl). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -include("emqtt.hrl"). diff --git a/apps/emqtt/src/emqtt_db.erl b/apps/emqtt/src/emqtt_db.erl index 0b5f508ad..961556666 100644 --- a/apps/emqtt/src/emqtt_db.erl +++ b/apps/emqtt/src/emqtt_db.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_db). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -export([init/0, stop/0]). diff --git a/apps/emqtt/src/emqtt_http.erl b/apps/emqtt/src/emqtt_http.erl index e3bdc4f90..e6aa3ccec 100644 --- a/apps/emqtt/src/emqtt_http.erl +++ b/apps/emqtt/src/emqtt_http.erl @@ -1,5 +1,5 @@ %%------------------------------------------------------------------------------ -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_http). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -include("emqtt.hrl"). @@ -46,11 +46,8 @@ handle('POST', "/mqtt/publish", Req) -> lager:info("~p~n", [Params]), Topic = list_to_binary(get_value("topic", Params)), Message = list_to_binary(get_value("message", Params)), - emqtt_pubsub:publish(#mqtt_msg { - retain = 0, - qos = ?QOS_0, + emqtt_pubsub:publish(#mqtt_message { topic = Topic, - dup = 0, payload = Message }), Req:ok({"text/plan", "ok"}); diff --git a/apps/emqtt/src/emqtt_keep_alive.erl b/apps/emqtt/src/emqtt_keep_alive.erl index 873608fb9..048f0dde7 100644 --- a/apps/emqtt/src/emqtt_keep_alive.erl +++ b/apps/emqtt/src/emqtt_keep_alive.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_keep_alive). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -export([new/2, state/1, diff --git a/apps/emqtt/src/emqtt_monitor.erl b/apps/emqtt/src/emqtt_monitor.erl index 996600707..934aff4fb 100644 --- a/apps/emqtt/src/emqtt_monitor.erl +++ b/apps/emqtt/src/emqtt_monitor.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_monitor). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -behavior(gen_server). diff --git a/apps/emqtt/src/emqtt_net.erl b/apps/emqtt/src/emqtt_net.erl index e6620c3b7..949f731cf 100644 --- a/apps/emqtt/src/emqtt_net.erl +++ b/apps/emqtt/src/emqtt_net.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_net). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). diff --git a/apps/emqtt/src/emqtt_frame.erl b/apps/emqtt/src/emqtt_packet.erl similarity index 55% rename from apps/emqtt/src/emqtt_frame.erl rename to apps/emqtt/src/emqtt_packet.erl index 660e2c6f8..f5b58e0a7 100644 --- a/apps/emqtt/src/emqtt_frame.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -1,5 +1,5 @@ %%------------------------------------------------------------------------------ -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -23,14 +23,14 @@ %% The Original Code is from RabbitMQ. %% --module(emqtt_frame). +-module(emqtt_packet). --include("emqtt_frame.hrl"). +-include("emqtt_packet.hrl"). --export([parse/2, initial_state/0]). --export([serialise/1]). +-export([initial_state/0]). + +-export([parse/2, serialise/1]). --define(RESERVED, 0). -define(MAX_LEN, 16#fffffff). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). @@ -39,30 +39,30 @@ initial_state() -> none. parse(<<>>, none) -> {more, fun(Bin) -> parse(Bin, none) end}; -parse(<>, none) -> - parse_remaining_len(Rest, #mqtt_frame_fixed{ type = MessageType, - dup = bool(Dup), - qos = QoS, - retain = bool(Retain) }); +parse(<>, none) -> + parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) }); parse(Bin, Cont) -> Cont(Bin). -parse_remaining_len(<<>>, Fixed) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Fixed) end}; -parse_remaining_len(Rest, Fixed) -> - parse_remaining_len(Rest, Fixed, 1, 0). +parse_remaining_len(<<>>, Header) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header) end}; +parse_remaining_len(Rest, Header) -> + parse_remaining_len(Rest, Header, 1, 0). -parse_remaining_len(_Bin, _Fixed, _Multiplier, Length) +parse_remaining_len(_Bin, _Header, _Multiplier, Length) when Length > ?MAX_LEN -> {error, invalid_mqtt_frame_len}; -parse_remaining_len(<<>>, Fixed, Multiplier, Length) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Fixed, Multiplier, Length) end}; -parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) -> - parse_remaining_len(Rest, Fixed, Multiplier * ?HIGHBIT, Value + Len * Multiplier); -parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) -> - parse_frame(Rest, Fixed, Value + Len * Multiplier). +parse_remaining_len(<<>>, Header, Multiplier, Length) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length) end}; +parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> + parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value) -> + parse_frame(Rest, Header, Value + Len * Multiplier). -parse_frame(Bin, #mqtt_frame_fixed{ type = Type, - qos = Qos } = Fixed, Length) -> +parse_frame(Bin, #mqtt_packet_header{ type = Type, + qos = Qos } = Header, Length) -> case {Type, Bin} of {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), @@ -83,8 +83,8 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type, {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag), case protocol_name_approved(ProtoVersion, ProtoName) of true -> - wrap(Fixed, - #mqtt_frame_connect{ + wrap(Header, + #mqtt_packet_connect{ proto_ver = ProtoVersion, will_retain = bool(WillRetain), will_qos = WillQos, @@ -101,41 +101,41 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type, end; {?PUBLISH, <>} -> {TopicName, Rest1} = parse_utf(FrameBin), - {MessageId, Payload} = case Qos of + {PacketId, Payload} = case Qos of 0 -> {undefined, Rest1}; - _ -> <> = Rest1, - {M, R} + _ -> <> = Rest1, + {Id, R} end, - wrap(Fixed, #mqtt_frame_publish {topic_name = TopicName, - message_id = MessageId }, + wrap(Header, #mqtt_packet_publish {topic_name = TopicName, + packet_id = PacketId }, Payload, Rest); {?PUBACK, <>} -> - <> = FrameBin, - wrap(Fixed, #mqtt_frame_publish{message_id = MessageId}, Rest); + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?PUBREC, <>} -> - <> = FrameBin, - wrap(Fixed, #mqtt_frame_publish{message_id = MessageId}, Rest); + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); {?PUBREL, <>} -> - <> = FrameBin, - wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); {?PUBCOMP, <>} -> - <> = FrameBin, - wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); + <> = FrameBin, + wrap(Header, #mqtt_packet_puback{ packet_id = PacketId }, Rest); {Subs, <>} when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE -> 1 = Qos, - <> = FrameBin, + <> = FrameBin, Topics = parse_topics(Subs, Rest1, []), - wrap(Fixed, #mqtt_frame_subscribe { message_id = MessageId, + wrap(Header, #mqtt_packet_subscribe { packet_id = PacketId, topic_table = Topics }, Rest); {Minimal, Rest} when Minimal =:= ?DISCONNECT orelse Minimal =:= ?PINGREQ -> Length = 0, - wrap(Fixed, Rest); + wrap(Header, Rest); {_, TooShortBin} -> {more, fun(BinMore) -> parse_frame(<>, - Fixed, Length) + Header, Length) end} end. @@ -148,12 +148,12 @@ parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> {Name, <>} = parse_utf(Bin), parse_topics(Sub, Rest, [#mqtt_topic { name = Name } | Topics]). -wrap(Fixed, Variable, Payload, Rest) -> - {ok, #mqtt_frame { variable = Variable, fixed = Fixed, payload = Payload }, Rest}. -wrap(Fixed, Variable, Rest) -> - {ok, #mqtt_frame { variable = Variable, fixed = Fixed }, Rest}. -wrap(Fixed, Rest) -> - {ok, #mqtt_frame { fixed = Fixed }, Rest}. +wrap(Header, Variable, Payload, Rest) -> + {ok, #mqtt_packet{ header = Header, variable = Variable, payload = Payload }, Rest}. +wrap(Header, Variable, Rest) -> + {ok, #mqtt_packet { header = Header, variable = Variable }, Rest}. +wrap(Header, Rest) -> + {ok, #mqtt_packet { header = Header }, Rest}. parse_utf(Bin, 0) -> {undefined, Bin}; @@ -173,72 +173,62 @@ bool(1) -> true. %% serialisation -serialise(#mqtt_frame{ fixed = Fixed, - variable = Variable, - payload = Payload }) -> - serialise_variable(Fixed, Variable, serialise_payload(Payload)). +serialise(#mqtt_packet{ header = Header, + variable = Variable, + payload = Payload }) -> + serialise_header(Header, + serialise_variable(Header, Variable, + serialise_payload(Payload))). serialise_payload(undefined) -> <<>>; serialise_payload(B) when is_binary(B) -> B. -serialise_variable(#mqtt_frame_fixed { type = ?CONNACK } = Fixed, - #mqtt_frame_connack { return_code = ReturnCode }, +serialise_variable(#mqtt_packet_header { type = ?CONNACK }, + #mqtt_packet_connack { ack_flags = AckFlags, + return_code = ReturnCode }, <<>> = PayloadBin) -> - VariableBin = <>, - serialise_fixed(Fixed, VariableBin, PayloadBin); + VariableBin = <>, + {VariableBin, PayloadBin}; -serialise_variable(#mqtt_frame_fixed { type = SubAck } = Fixed, - #mqtt_frame_suback { message_id = MessageId, - qos_table = Qos }, +serialise_variable(#mqtt_packet_header { type = SubAck }, + #mqtt_packet_suback { packet_id = PacketId, + qos_table = Qos }, <<>> = _PayloadBin) when SubAck =:= ?SUBACK orelse SubAck =:= ?UNSUBACK -> - VariableBin = <>, - QosBin = << <> || Q <- Qos >>, - serialise_fixed(Fixed, VariableBin, QosBin); + VariableBin = <>, + QosBin = << <> || Q <- Qos >>, + {VariableBin, QosBin}; -serialise_variable(#mqtt_frame_fixed { type = ?PUBLISH, - qos = Qos } = Fixed, - #mqtt_frame_publish { topic_name = TopicName, - message_id = MessageId }, +serialise_variable(#mqtt_packet_header { type = ?PUBLISH, + qos = Qos }, + #mqtt_packet_publish { topic_name = TopicName, + packet_id = PacketId }, PayloadBin) -> TopicBin = serialise_utf(TopicName), - MessageIdBin = case Qos of + PacketIdBin = case Qos of 0 -> <<>>; - 1 -> <>; - 2 -> <> + 1 -> <>; + 2 -> <> end, - serialise_fixed(Fixed, <>, PayloadBin); + {<>, PayloadBin}; -serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed, - #mqtt_frame_publish { message_id = MessageId }, - PayloadBin) -> - MessageIdBin = <>, - serialise_fixed(Fixed, MessageIdBin, PayloadBin); +serialise_variable(#mqtt_packet_header { type = PubAck }, + #mqtt_packet_puback { packet_id = PacketId }, + <<>> = _Payload) + when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; + PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP -> + {PacketIdBin = <>, <<>>}; -serialise_variable(#mqtt_frame_fixed { type = ?PUBREC } = Fixed, - #mqtt_frame_publish{ message_id = MsgId}, - PayloadBin) -> - serialise_fixed(Fixed, <>, PayloadBin); - -serialise_variable(#mqtt_frame_fixed { type = ?PUBREL } = Fixed, - #mqtt_frame_publish{ message_id = MsgId}, - PayloadBin) -> - serialise_fixed(Fixed, <>, PayloadBin); - -serialise_variable(#mqtt_frame_fixed { type = ?PUBCOMP } = Fixed, - #mqtt_frame_publish{ message_id = MsgId}, - PayloadBin) -> - serialise_fixed(Fixed, <>, PayloadBin); - -serialise_variable(#mqtt_frame_fixed {} = Fixed, +serialise_variable(#mqtt_packet_header { }, undefined, <<>> = _PayloadBin) -> - serialise_fixed(Fixed, <<>>, <<>>). + {<<>>, <<>>}. -serialise_fixed(#mqtt_frame_fixed{ type = Type, - dup = Dup, - qos = Qos, - retain = Retain }, VariableBin, PayloadBin) +serialise_header(#mqtt_packet_header{ type = Type, + dup = Dup, + qos = Qos, + retain = Retain }, + {VariableBin, PayloadBin}) when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT -> Len = size(VariableBin) + size(PayloadBin), true = (Len =< ?MAX_LEN), @@ -264,3 +254,4 @@ opt(X) when is_integer(X) -> X. protocol_name_approved(Ver, Name) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). + diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index f2504ace5..2f2cd4e76 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -24,71 +24,89 @@ -include("emqtt.hrl"). --include("emqtt_frame.hrl"). +-include("emqtt_packet.hrl"). -record(proto_state, { socket, - message_id, + connected = false, %received CONNECT action? + packet_id, client_id, clean_sess, will_msg, + subscriptions, awaiting_ack, - subtopics, awaiting_rel }). -type proto_state() :: #proto_state{}. --export([initial_state/1, handle_frame/2, send_message/2, client_terminated/1]). +-export([initial_state/1]). + +-export([handle_packet/2, send_packet/2, client_terminated/1]). -export([info/1]). --define(FRAME_TYPE(Frame, Type), - Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). +-define(PACKET_TYPE(Packet, Type), + Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}). initial_state(Socket) -> #proto_state{ socket = Socket, - message_id = 1, + packet_id = 1, + subscriptions = [], awaiting_ack = gb_trees:empty(), - subtopics = [], awaiting_rel = gb_trees:empty() }. -info(#proto_state{ message_id = MsgId, +info(#proto_state{ packet_id = PacketId, client_id = ClientId, clean_sess = CleanSess, will_msg = WillMsg, - subtopics = SubTopics}) -> - [ {message_id, MsgId}, - {client_id, ClientId}, - {clean_sess, CleanSess}, - {will_msg, WillMsg}, - {subtopics, SubTopics} ]. + subscriptions= Subs }) -> + [ {packet_id, PacketId}, + {client_id, ClientId}, + {clean_sess, CleanSess}, + {will_msg, WillMsg}, + {subscriptions, Subs} ]. --spec handle_frame(Frame, State) -> {ok, NewState} | {error, any()} when - Frame :: #mqtt_frame{}, - State :: proto_state(), - NewState :: proto_state(). +-spec handle_packet(Packet, State) -> {ok, NewState} | {error, any()} when + Packet :: mqtt_packet(), + State :: proto_state(), + NewState :: proto_state(). -handle_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, +%%CONNECT – Client requests a connection to a Server + +%%A Client can only send the CONNECT Packet once over a Network Connection. 369 +handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = false}) -> + handle_packet(?CONNECT, Packet, State#proto_state{connected = true}); + +handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = true}) -> + {error, protocol_bad_connect, State}; + +%%Received other packets when CONNECT not arrived. +handle_packet(_Packet, State = #proto_state{connected = false}) -> + {error, protocol_not_connected, State}; + +handle_packet(?PACKET_TYPE(Packet, Type), State = #proto_state{client_id = ClientId}) -> - lager:info("frame from ~s: ~p", [ClientId, Frame]), - case validate_frame(Type, Frame) of + lager:info("packet from ~s: ~p", [ClientId, Packet]), + case validate_packet(Type, Packet) of ok -> - handle_request(Type, Frame, State); + handle_packet(Type, Packet, State); {error, Reason} -> {error, Reason, State} end. -handle_request(?CONNECT, - #mqtt_frame{ variable = #mqtt_frame_connect{ - username = Username, - password = Password, - proto_ver = ProtoVersion, - clean_sess = CleanSess, - keep_alive = AlivePeriod, - client_id = ClientId } = Var}, State0 = #proto_state{socket = Sock}) -> +handle_packet(?CONNECT, #mqtt_packet { + variable = #mqtt_packet_connect { + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + keep_alive = AlivePeriod, + client_id = ClientId } = Var }, + State0 = #proto_state{socket = Sock}) -> + State = State0#proto_state{client_id = ClientId}, {ReturnCode, State1} = case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), @@ -113,202 +131,204 @@ handle_request(?CONNECT, end end, lager:info("recv conn...:~p", [ReturnCode]), - send_frame(Sock, #mqtt_frame{ - fixed = #mqtt_frame_fixed{ type = ?CONNACK }, - variable = #mqtt_frame_connack{ - return_code = ReturnCode }}), + send_packet(Sock, #mqtt_packet { + header = #mqtt_packet_header { type = ?CONNACK }, + variable = #mqtt_packet_connack{ return_code = ReturnCode }}), {ok, State1}; -handle_request(?PUBLISH, Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> - emqtt_router:route(make_msg(Frame)), +handle_packet(?PUBLISH, Packet = #mqtt_packet { + header = #mqtt_packet_header {qos = ?QOS_0}}, State) -> + emqtt_router:route(make_message(Packet)), {ok, State}; -handle_request(?PUBLISH, - Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_1}, - variable = #mqtt_frame_publish{message_id = MsgId}}, +handle_packet(?PUBLISH, Packet = #mqtt_packet { + header = #mqtt_packet_header { qos = ?QOS_1 }, + variable = #mqtt_packet_publish{packet_id = PacketId}}, State=#proto_state{socket=Sock}) -> - emqtt_pubsub:publish(make_msg(Frame)), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, - variable = #mqtt_frame_publish{ message_id = MsgId}}), + emqtt_router:route(make_message(Packet)), + send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBACK }, + variable = #mqtt_packet_puback { packet_id = PacketId}}), {ok, State}; -handle_request(?PUBLISH, - Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_2}, - variable = #mqtt_frame_publish{message_id = MsgId}}, +handle_packet(?PUBLISH, Packet = #mqtt_packet { + header = #mqtt_packet_header { qos = ?QOS_2 }, + variable = #mqtt_packet_publish{packet_id = PacketId}}, State=#proto_state{socket=Sock}) -> - emqtt_pubsub:publish(make_msg(Frame)), - put({msg, MsgId}, pubrec), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - + %%FIXME: this is not right...should store it first... + emqtt_router:route(make_message(Packet)), + put({msg, PacketId}, pubrec), + send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBREC }, + variable = #mqtt_packet_puback { packet_id = PacketId}}), {ok, State}; -handle_request(?PUBACK, #mqtt_frame{}, State) -> +handle_packet(?PUBACK, #mqtt_packet {}, State) -> + %FIXME Later + {ok, State}; + +handle_packet(?PUBREC, #mqtt_packet { + variable = #mqtt_packet_puback { packet_id = PktId }}, + State=#proto_state{socket=Sock}) -> + %FIXME Later: should release the message here + send_packet(Sock, #mqtt_packet { + header = #mqtt_packet_header { type = ?PUBREL}, + variable = #mqtt_packet_puback { packet_id = PktId}}), + {ok, State}; + +handle_packet(?PUBREL, #mqtt_packet { + variable = #mqtt_packet_puback { packet_id = PktId}}, + State=#proto_state{socket=Sock}) -> + %%FIXME: not right... + erase({msg, PktId}), + send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?PUBCOMP}, + variable = #mqtt_packet_puback { packet_id = PktId}}), + {ok, State}; + +handle_packet(?PUBCOMP, #mqtt_packet { + variable = #mqtt_packet_puback{packet_id = _PktId}}, State) -> %TODO: fixme later {ok, State}; -handle_request(?PUBREC, #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#proto_state{socket=Sock}) -> - %TODO: fixme later - send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -handle_request(?PUBREL, - #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#proto_state{socket=Sock}) -> - erase({msg, MsgId}), - send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -handle_request(?PUBCOMP, #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> - %TODO: fixme later - {ok, State}; - -handle_request(?SUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{message_id = MessageId, - topic_table = Topics}, - payload = undefined}, - #proto_state{socket=Sock} = State) -> +handle_packet(?SUBSCRIBE, #mqtt_packet { + variable = #mqtt_packet_subscribe{ + packet_id = PacketId, + topic_table = Topics}, + payload = undefined}, + State = #proto_state{socket=Sock}) -> + %%FIXME: this is not right... [emqtt_pubsub:subscribe({Name, Qos}, self()) || #mqtt_topic{name=Name, qos=Qos} <- Topics], GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = GrantedQos}}), + send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK }, + variable = #mqtt_packet_suback{ + packet_id = PacketId, + qos_table = GrantedQos }}), {ok, State}; -handle_request(?UNSUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{message_id = MessageId, - topic_table = Topics }, - payload = undefined}, #proto_state{socket = Sock, client_id = ClientId} = State) -> - +handle_packet(?UNSUBSCRIBE, #mqtt_packet { + variable = #mqtt_packet_subscribe{ + packet_id = PacketId, + topic_table = Topics }, + payload = undefined}, + State = #proto_state{socket = Sock, client_id = ClientId}) -> [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, - variable = #mqtt_frame_suback{message_id = MessageId }}), + send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK }, + variable = #mqtt_packet_suback{packet_id = PacketId }}), {ok, State}; -%, keep_alive=KeepAlive -handle_request(?PINGREQ, #mqtt_frame{}, #proto_state{socket=Sock}=State) -> - %Keep alive timer - %%TODO:... - %%KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), +handle_packet(?PINGREQ, #mqtt_packet{}, #proto_state{socket=Sock}=State) -> + send_packet(Sock, make_packet(?PINGRESP)), {ok, State}; -handle_request(?DISCONNECT, #mqtt_frame{}, State=#proto_state{client_id=ClientId}) -> +handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{client_id=ClientId}) -> lager:info("~s disconnected", [ClientId]), {stop, State}. + +make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT -> + #mqtt_packet{ header = #mqtt_packet_header { type = Type } }. + -spec send_message(Message, State) -> {ok, NewState} when - Message :: mqtt_msg(), + Message :: mqtt_message(), State :: proto_state(), NewState :: proto_state(). -send_message(Message, State = #proto_state{socket = Sock, message_id = MsgId}) -> +send_message(Message = #mqtt_message{ + retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + payload = Payload}, + State = #proto_state{socket = Sock, packet_id = PacketId}) -> - #mqtt_msg{retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - payload = Payload, - encoder = Encoder} = Message, - - Payload1 = - if - Encoder == undefined -> Payload; - true -> Encoder(Payload) - end, - Frame = #mqtt_frame{ - fixed = #mqtt_frame_fixed{type = ?PUBLISH, - qos = Qos, - retain = Retain, - dup = Dup}, - variable = #mqtt_frame_publish{topic_name = Topic, - message_id = if - Qos == ?QOS_0 -> undefined; - true -> MsgId - end}, - payload = Payload1}, + Packet = #mqtt_packet { + header = #mqtt_packet_header { + type = ?PUBLISH, + qos = Qos, + retain = Retain, + dup = Dup }, + variable = #mqtt_packet_publish { + topic_name = Topic, + packet_id = if + Qos == ?QOS_0 -> undefined; + true -> PacketId + end }, + payload = Payload}, - send_frame(Sock, Frame), + send_packet(Sock, Packet), if Qos == ?QOS_0 -> {ok, State}; true -> - {ok, next_msg_id(State)} + {ok, next_packet_id(State)} end. -send_frame(Sock, Frame) -> - lager:info("send frame:~p", [Frame]), - erlang:port_command(Sock, emqtt_frame:serialise(Frame)). +send_packet(Sock, Packet) -> + lager:info("send packet:~p", [Packet]), + %%FIXME Later... + erlang:port_command(Sock, emqtt_packet:serialise(Packet)). %%TODO: fix me later... client_terminated(#proto_state{client_id = ClientId} = State) -> ok. %emqtt_cm:unregister(ClientId, self()). -make_msg(#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = Qos, - retain = Retain, - dup = Dup}, - variable = #mqtt_frame_publish{topic_name = Topic, - message_id = MessageId}, - payload = Payload}) -> - #mqtt_msg{retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - msgid = MessageId, - payload = Payload}. +make_message(#mqtt_packet { + header = #mqtt_packet_header{ + qos = Qos, + retain = Retain, + dup = Dup }, + variable = #mqtt_packet_publish{ + topic_name = Topic, + packet_id = PacketId }, + payload = Payload }) -> -make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> + #mqtt_message{ retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + msgid = PacketId, + payload = Payload}. + +make_will_msg(#mqtt_packet_connect{ will_flag = false }) -> undefined; -make_will_msg(#mqtt_frame_connect{ will_retain = Retain, - will_qos = Qos, - will_topic = Topic, - will_msg = Msg }) -> - #mqtt_msg{retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg }. -next_msg_id(State = #proto_state{ message_id = 16#ffff }) -> - State #proto_state{ message_id = 1 }; -next_msg_id(State = #proto_state{ message_id = MsgId }) -> - State #proto_state{ message_id = MsgId + 1 }. +make_will_msg(#mqtt_packet_connect{ will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_msg = Msg }) -> + #mqtt_message{ retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. + +next_packet_id(State = #proto_state{ packet_id = 16#ffff }) -> + State #proto_state{ packet_id = 1 }; +next_packet_id(State = #proto_state{ packet_id = PacketId }) -> + State #proto_state{ packet_id = PacketId + 1 }. valid_client_id(ClientId) -> ClientIdLen = size(ClientId), - 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. + 1 =< ClientIdLen andalso ClientIdLen =< ?MAX_CLIENTID_LEN. -validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) -> +validate_packet(?PUBLISH, #mqtt_packet { + variable = #mqtt_packet_publish{ + topic_name = Topic }}) -> case emqtt_topic:validate({publish, Topic}) of true -> ok; false -> {error, badtopic} end; -validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> +validate_packet(?UNSUBSCRIBE, #mqtt_packet { + variable = #mqtt_packet_subscribe{ + topic_table = Topics }}) -> ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, not emqtt_topic:validate({subscribe, Topic})], case ErrTopics of @@ -316,7 +336,7 @@ validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_ _ -> lager:error("error topics: ~p", [ErrTopics]), {error, badtopic} end; -validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> +validate_packet(?SUBSCRIBE, #mqtt_packet{variable = #mqtt_packet_subscribe{topic_table = Topics}}) -> ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))], case ErrTopics of @@ -324,7 +344,7 @@ validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_ta _ -> lager:error("error topics: ~p", [ErrTopics]), {error, badtopic} end; -validate_frame(_Type, _Frame) -> +validate_packet(_Type, _Frame) -> ok. maybe_clean_sess(false, _Conn, _ClientId) -> @@ -337,3 +357,4 @@ send_will_msg(#proto_state{will_msg = undefined}) -> ignore; send_will_msg(#proto_state{will_msg = WillMsg }) -> emqtt_router:route(WillMsg). + diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index a9785deee..f687b7527 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_pubsub). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -include("emqtt.hrl"). @@ -82,7 +82,7 @@ topics() -> %% %% @doc Subscribe Topic %% --spec subscribe({Topic :: binary(), Qos :: qos()}, SubPid :: pid()) -> any(). +-spec subscribe({Topic :: binary(), Qos :: mqtt_qos()}, SubPid :: pid()) -> any(). subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}). @@ -96,11 +96,11 @@ unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> %% %% @doc Publish to cluster node. %% --spec publish(Msg :: mqtt_msg()) -> ok. -publish(Msg=#mqtt_msg{topic=Topic}) -> +-spec publish(Msg :: mqtt_message()) -> ok. +publish(Msg=#mqtt_message{topic=Topic}) -> publish(Topic, Msg). --spec publish(Topic :: binary(), Msg :: mqtt_msg()) -> any(). +-spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). publish(Topic, Msg) when is_binary(Topic) -> lists:foreach(fun(#topic{name=Name, node=Node}) -> case Node =:= node() of @@ -247,10 +247,10 @@ trie_match(NodeId, [W|Words], ResAcc) -> [#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc); [] -> Acc end - end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]). + end, 'trie_match_#'(NodeId, ResAcc), [W, <<"+">>]). 'trie_match_#'(NodeId, ResAcc) -> - case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word="#"}) of + case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = <<"#">>}) of [#topic_trie{node_id=ChildId}] -> mnesia:read(topic_trie_node, ChildId) ++ ResAcc; [] -> diff --git a/apps/emqtt/src/emqtt_queue.erl b/apps/emqtt/src/emqtt_queue.erl index da715a542..bb6e3cb77 100644 --- a/apps/emqtt/src/emqtt_queue.erl +++ b/apps/emqtt/src/emqtt_queue.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal diff --git a/apps/emqtt/src/emqtt_queue_sup.erl b/apps/emqtt/src/emqtt_queue_sup.erl index 3cfa25383..c49e2e1a8 100644 --- a/apps/emqtt/src/emqtt_queue_sup.erl +++ b/apps/emqtt/src/emqtt_queue_sup.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -21,7 +21,7 @@ %%------------------------------------------------------------------------------ -module(emqtt_queue_sup). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -behavior(supervisor). diff --git a/apps/emqtt/src/emqtt_retained.erl b/apps/emqtt/src/emqtt_retained.erl index 69d643761..2db923316 100644 --- a/apps/emqtt/src/emqtt_retained.erl +++ b/apps/emqtt/src/emqtt_retained.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_retained). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). %%TODO: FIXME Later... diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index 43cc16268..9b552e8d3 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -25,8 +25,6 @@ -include("emqtt.hrl"). --include("emqtt_frame.hrl"). - -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -56,7 +54,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec route(Msg :: mqtt_msg()) -> any(). +-spec route(Msg :: mqtt_message()) -> any(). route(Msg) -> emqtt_pubsub:publish(retained(Msg)). @@ -85,7 +83,8 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ -retained(Msg = #mqtt_msg{retain = true, topic = Topic}) -> +retained(Msg = #mqtt_message{retain = true, topic = Topic}) -> emqtt_retained:insert(Topic, Msg), Msg; retained(Msg) -> Msg. + diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl new file mode 100644 index 000000000..35ade693e --- /dev/null +++ b/apps/emqtt/src/emqtt_session.erl @@ -0,0 +1,24 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_session). + diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl index cbcb477d9..508947725 100644 --- a/apps/emqtt/src/emqtt_sm.erl +++ b/apps/emqtt/src/emqtt_sm.erl @@ -1,5 +1,99 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + + +%%------------------------------------------------------------------------------ +%% +%% The Session state in the Server consists of: +%% The existence of a Session, even if the rest of the Session state is empty. +%% The Client’s subscriptions. +%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely +%% acknowledged. +%% QoS 1 and QoS 2 messages pending transmission to the Client. +%% QoS 2 messages which have been received from the Client, but have not been completely +%% acknowledged. +%% Optionally, QoS 0 messages pending transmission to the Client. +%% +%%------------------------------------------------------------------------------ + -module(emqtt_sm). %%emqtt session manager... %%cleanSess: true | false + +-include("emqtt.hrl"). + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +-export([start_link/0]). + +-export([create/2, resume/2, destroy/1]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +create(ClientId, Pid) -> ok. + +resume(ClientId, Pid) -> ok. + +destroy(ClientId) -> ok. + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(Args) -> + {ok, Args}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/apps/emqtt/src/emqtt_sup.erl b/apps/emqtt/src/emqtt_sup.erl index de3d053b3..567f19a58 100644 --- a/apps/emqtt/src/emqtt_sup.erl +++ b/apps/emqtt/src/emqtt_sup.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_sup). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -include("emqtt.hrl"). diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index 29d9d865d..d9ba5d41b 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2014, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ -module(emqtt_topic). --author('feng@slimchat.io'). +-author('feng@emqtt.io'). -import(lists, [reverse/1]). diff --git a/apps/emqtt/test/emqtt_packet_tests.erl b/apps/emqtt/test/emqtt_packet_tests.erl new file mode 100644 index 000000000..de520f777 --- /dev/null +++ b/apps/emqtt/test/emqtt_packet_tests.erl @@ -0,0 +1,14 @@ +-module(emqtt_packet_tests). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +decode_test() -> + ok. + +encode_test() -> + ok. + +-endif. + diff --git a/data/.placeholder b/data/.placeholder new file mode 100644 index 000000000..0fec8a3ed --- /dev/null +++ b/data/.placeholder @@ -0,0 +1 @@ +durable queue data... diff --git a/doc/protocol.md b/doc/protocol.md index 2043706b9..77ba565b3 100644 --- a/doc/protocol.md +++ b/doc/protocol.md @@ -30,3 +30,7 @@ Connection, others can span multiple consecutive Network Connections between a C An expression contained in a Subscription, to indicate an interest in one or more topics. A Topic Filter can include wildcard characters. + +## Packet Identifier + +