diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..d2607c88e --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,56 @@ +eMQTT ChangeLog +================== + +0.2.0 (2014-12-07) +------------------- + +rewrite the project, integrate with esockd, mochiweb + +support MQTT 3.1.1 + +support HTTP to publish message + +0.1.5 (2013-01-05) +------------------- + +Bugfix: remove QOS_1 match when handle PUBREL request + +Bugfix: reverse word in emqtt_topic:words/1 function + +0.1.4 (2013-01-04) +------------------- + +Bugfix: fix "mosquitto_sub -q 2 ......" bug + +Bugfix: fix keep alive bug + +0.1.3 (2012-01-04) +------------------- + +Feature: support QOS2 PUBREC, PUBREL,PUBCOMP messages + +Bugfix: fix emqtt_frame to encode/decoe PUBREC/PUBREL messages + + +0.1.2 (2012-12-27) +------------------- + +Feature: release support like riak + +Bugfix: use ?INFO/?ERROR to print log in tcp_listener.erl + + +0.1.1 (2012-09-24) +------------------- + +Feature: use rebar to generate release + +Feature: support retained messages + +Bugfix: send will msg when network error + +0.1.0 (2012-09-21) +------------------- + +The first public release. + diff --git a/CHANGES b/CHANGES deleted file mode 100644 index a04c03cc1..000000000 --- a/CHANGES +++ /dev/null @@ -1,39 +0,0 @@ -Changes with emqtt 0.1.5 05 Jan 2012 - - *) Bugfix: remove QOS_1 match when handle PUBREL request - - *) Bugfix: reverse word in emqtt_topic:words/1 function - - -Changes with emqtt 0.1.4 04 Jan 2012 - - *) Bugfix: fix "mosquitto_sub -q 2 ......" bug - - *) Bugfix: fix keep alive bug - -Changes with emqtt 0.1.3 04 Jan 2012 - - *) Feature: support QOS2 PUBREC, PUBREL,PUBCOMP messages - - *) Bugfix: fix emqtt_frame to encode/decoe PUBREC/PUBREL messages - - -Changes with emqtt 0.1.2 27 Dec 2012 - - *) Feature: release support like riak - - *) Bugfix: use ?INFO/?ERROR to print log in tcp_listener.erl - - -Changes with emqtt 0.1.1 24 Dec 2012 - - *) Feature: use rebar to generate release - - *) Feature: support retained messages - - *) Bugfix: send will msg when network error - -Changes with emqtt 0.1.0 21 Dec 2012 - - *) The first public release. - diff --git a/LICENSE b/LICENSE index 0e6702163..fe58b384a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2014, Feng Lee +Copyright (c) 2014, Feng Lee Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 030a92698..de4686df3 100644 --- a/README.md +++ b/README.md @@ -1,74 +1,85 @@ -emqtt -===== +# eMQTT -erlang mqtt broker. +eMQTT is a scalable, fault-tolerant and extensible mqtt broker written in Erlang/OTP. -requires -======== +eMQTT support MQTT V3.1 Protocol Specification. -erlang R15B+ +eMQTT requires Erlang R17+. -git client +## Startup in Five Minutes -build -======= +``` + $ git clone git://github.com/slimpp/emqtt.git -make + $ cd emqtt -release -======= + $ make && make dist -make generate + $ cd rel/emqtt -deloy -===== + $ ./bin/emqtt console +``` -cp -R rel/emqtt $INSTALL_DIR +## Deploy and Start -start -====== +### start -cd $INSTALL_DRI/emqtt +``` + cp -R rel/emqtt $INSTALL_DIR -./bin/emqtt console + cd $INSTALL_DIR/emqtt -or + ./bin/emqtt start -./bin/emqtt start +``` -status -====== +### stop -./bin/emqtt_ctl status +``` + ./bin/emqtt stop -stop -==== +``` -./bin/emqtt stop +## Configuration -logs -==== +...... -log/* +## Admin and Cluster -http api -======== +...... -curl -v --basic -u user:passwd -d "topic=/abc&message=akakakk&qos=0" -k http://localhost:8883/mqtt/publish +## HTTP API -design -===== +eMQTT support http to publish message. -https://github.com/slimpp/emqtt/wiki +Example: -author -===== +``` + curl -v --basic -u user:passwd -d "topic=/a/b/c&message=hello from http..." -k http://localhost:8883/mqtt/publish +``` -Ery Lee +### URL +``` + HTTP POST http://host:8883/mqtt/publish +``` -license -====== +### Parameters + +Name | Description +-----|------------- +topic | MQTT Topic +message | Text Message + +## Design + +[Design Wiki](https://github.com/slimpp/emqtt/wiki) + +## License The MIT License (MIT) +## Author + +feng at slimchat.io + diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl index ef68345de..9c233c6e5 100644 --- a/apps/emqtt/include/emqtt_frame.hrl +++ b/apps/emqtt/include/emqtt_frame.hrl @@ -18,11 +18,15 @@ %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% --define(PROTOCOL_NAMES, [{3, "MQIsdp"}, {4, "MQTT"}]). +-define(CLIENT_ID_MAXLEN, 1024). + +-define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]). -define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MINOR, 1). +-define(CLIENT_ID_MAXLEN, 23). + %% frame types -define(CONNECT, 1). @@ -49,6 +53,21 @@ -define(CONNACK_CREDENTIALS, 4). %% bad user name or password -define(CONNACK_AUTH, 5). %% not authorized +-record(state, {socket, + conn_name, + await_recv, + connection_state, + conserve, + parse_state, + message_id, + client_id, + clean_sess, + will_msg, + keep_alive, + awaiting_ack, + subtopics, + awaiting_rel}). + -record(mqtt_frame, {fixed, variable, diff --git a/apps/emqtt/src/emqtt_auth.erl b/apps/emqtt/src/emqtt_auth.erl index 131647cf9..0905ef8ee 100644 --- a/apps/emqtt/src/emqtt_auth.erl +++ b/apps/emqtt/src/emqtt_auth.erl @@ -30,7 +30,7 @@ -export([start_link/0, add/2, - check/2, + check/1, check/2, delete/1]). -behavior(gen_server). @@ -42,9 +42,15 @@ terminate/2, code_change/3]). +-define(TAB, ?MODULE). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec check({Usename :: binary(), Password :: binary()}) -> true | false. +check({Username, Password}) -> + execute(check, [Username, Password]). + -spec check(Usename :: binary(), Password :: binary()) -> true | false. check(Username, Password) -> execute(check, [Username, Password]). @@ -58,15 +64,15 @@ delete(Username) -> execute(delete, [Username]). execute(F, Args) -> - [{_, M}] = ets:lookup(emqtt_auth, mod), + [{_, M}] = ets:lookup(?TAB, mod), apply(M, F, Args). init([]) -> {ok, {Name, Opts}} = application:get_env(auth), AuthMod = authmod(Name), ok = AuthMod:init(Opts), - ets:new(emqtt_auth, [named_table, protected]), - ets:insert(emqtt_quth, {mod, AuthMod}), + ets:new(?TAB, [named_table, protected]), + ets:insert(?TAB, {mod, AuthMod}), ?PRINT("emqtt authmod is ~p", [AuthMod]), {ok, undefined}. diff --git a/apps/emqtt/src/emqtt_auth_internal.erl b/apps/emqtt/src/emqtt_auth_internal.erl index 0824fddde..f2dcef1dd 100644 --- a/apps/emqtt/src/emqtt_auth_internal.erl +++ b/apps/emqtt/src/emqtt_auth_internal.erl @@ -22,6 +22,8 @@ -module(emqtt_auth_internal). +-author('feng@slimchat.io'). + -include("emqtt.hrl"). -export([init/1, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index d6b62cfe0..ef7a01d2d 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -43,8 +43,6 @@ -include("emqtt_frame.hrl"). --define(CLIENT_ID_MAXLEN, 23). - -record(state, {socket, conn_name, await_recv, @@ -233,140 +231,6 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, {err, Reason, State} end. -process_request(?CONNECT, - #mqtt_frame{ variable = #mqtt_frame_connect{ - username = Username, - password = Password, - proto_ver = ProtoVersion, - clean_sess = CleanSess, - keep_alive = AlivePeriod, - client_id = ClientId } = Var}, #state{socket = Sock} = State) -> - {ReturnCode, State1} = - case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, - valid_client_id(ClientId)} of - {false, _} -> - {?CONNACK_PROTO_VER, State}; - {_, false} -> - {?CONNACK_INVALID_ID, State}; - _ -> - case emqtt_auth:check(Username, Password) of - false -> - ?ERROR_MSG("MQTT login failed - no credentials"), - {?CONNACK_CREDENTIALS, State}; - true -> - ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), - emqtt_cm:create(ClientId, self()), - KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), - {?CONNACK_ACCEPT, - State #state{ will_msg = make_will_msg(Var), - client_id = ClientId, - keep_alive = KeepAlive}} - end - end, - ?INFO("recv conn...:~p", [ReturnCode]), - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, - variable = #mqtt_frame_connack{ - return_code = ReturnCode }}), - {ok, State1}; - -process_request(?PUBLISH, Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> - emqtt_pubsub:publish(make_msg(Frame)), - {ok, State}; - -process_request(?PUBLISH, - Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_1}, - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - emqtt_pubsub:publish(make_msg(Frame)), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -process_request(?PUBLISH, - Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = ?QOS_2}, - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - emqtt_pubsub:publish(make_msg(Frame)), - put({msg, MsgId}, pubrec), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - - {ok, State}; - -process_request(?PUBACK, #mqtt_frame{}, State) -> - %TODO: fixme later - {ok, State}; - -process_request(?PUBREC, #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - %TODO: fixme later - send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -process_request(?PUBREL, - #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> - erase({msg, MsgId}), - send_frame(Sock, - #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, - variable = #mqtt_frame_publish{ message_id = MsgId}}), - {ok, State}; - -process_request(?PUBCOMP, #mqtt_frame{ - variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> - %TODO: fixme later - {ok, State}; - -process_request(?SUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{message_id = MessageId, - topic_table = Topics}, - payload = undefined}, - #state{socket=Sock} = State) -> - - [emqtt_pubsub:subscribe({Name, Qos}, self()) || - #mqtt_topic{name=Name, qos=Qos} <- Topics], - - GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], - - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = GrantedQos}}), - - {ok, State}; - -process_request(?UNSUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{message_id = MessageId, - topic_table = Topics }, - payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> - - - [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, - variable = #mqtt_frame_suback{message_id = MessageId }}), - - {ok, State}; - -process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> - %Keep alive timer - KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), - {ok, State#state{keep_alive=KeepAlive1}}; - -process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> - ?INFO("~s disconnected", [ClientId]), - {stop, State}. - next_msg_id(State = #state{ message_id = 16#ffff }) -> State #state{ message_id = 1 }; next_msg_id(State = #state{ message_id = MsgId }) -> @@ -428,7 +292,7 @@ stop(Reason, State ) -> {stop, Reason, State}. valid_client_id(ClientId) -> - ClientIdLen = length(ClientId), + ClientIdLen = size(ClientId), 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) -> diff --git a/apps/emqtt/src/emqtt_frame.erl b/apps/emqtt/src/emqtt_frame.erl index aa6b31e20..660e2c6f8 100644 --- a/apps/emqtt/src/emqtt_frame.erl +++ b/apps/emqtt/src/emqtt_frame.erl @@ -161,7 +161,7 @@ parse_utf(Bin, _) -> parse_utf(Bin). parse_utf(<>) -> - {binary_to_list(Str), Rest}. + {Str, Rest}. parse_msg(Bin, 0) -> {undefined, Bin}; diff --git a/apps/emqtt/src/emqtt_http.erl b/apps/emqtt/src/emqtt_http.erl index cf4918090..2458c4241 100644 --- a/apps/emqtt/src/emqtt_http.erl +++ b/apps/emqtt/src/emqtt_http.erl @@ -26,6 +26,8 @@ -include("emqtt.hrl"). +-include("emqtt_log.hrl"). + -import(proplists, [get_value/2, get_value/3]). -export([handle/1]). @@ -43,8 +45,8 @@ handle(Req) -> handle('POST', "/mqtt/publish", Req) -> Params = mochiweb_request:parse_post(Req), - error_logger:info_msg("~p~n", [Params]), - Topic = get_value("topic", Params), + ?INFO("~p~n", [Params]), + Topic = list_to_binary(get_value("topic", Params)), Message = list_to_binary(get_value("message", Params)), emqtt_pubsub:publish(#mqtt_msg { retain = 0, @@ -66,12 +68,9 @@ authorized(Req) -> undefined -> false; "Basic " ++ BasicAuth -> - {Username, Password} = user_passwd(BasicAuth), - emqtt_auth:check(Username, Password) + emqtt_auth:check(user_passwd(BasicAuth)) end. user_passwd(BasicAuth) -> - [U, P] = binary:split(base64:decode(BasicAuth), <<":">>), - {binary_to_list(U), binary_to_list(P)}. - + list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). diff --git a/apps/emqtt/src/emqtt_net.erl b/apps/emqtt/src/emqtt_net.erl index c4061fd31..e6620c3b7 100644 --- a/apps/emqtt/src/emqtt_net.erl +++ b/apps/emqtt/src/emqtt_net.erl @@ -24,7 +24,7 @@ -author('feng@slimchat.io'). --export([tcp_name/3, tcp_host/1, getaddr/2, port_to_listeners/1]). +-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). -export([connection_string/2]). diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl new file mode 100644 index 000000000..66af58d4f --- /dev/null +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -0,0 +1,167 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_protocol). + +-include("emqtt.hrl"). + +-include("emqtt_log.hrl"). + +-include("emqtt_frame.hrl"). + +-export([process_request/3]). + +process_request(?CONNECT, + #mqtt_frame{ variable = #mqtt_frame_connect{ + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + keep_alive = AlivePeriod, + client_id = ClientId } = Var}, #state{socket = Sock} = State) -> + {ReturnCode, State1} = + case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), + valid_client_id(ClientId)} of + {false, _} -> + {?CONNACK_PROTO_VER, State}; + {_, false} -> + {?CONNACK_INVALID_ID, State}; + _ -> + case emqtt_auth:check(Username, Password) of + false -> + ?ERROR_MSG("MQTT login failed - no credentials"), + {?CONNACK_CREDENTIALS, State}; + true -> + ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), + emqtt_cm:create(ClientId, self()), + KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), + {?CONNACK_ACCEPT, + State #state{ will_msg = make_will_msg(Var), + client_id = ClientId, + keep_alive = KeepAlive}} + end + end, + ?INFO("recv conn...:~p", [ReturnCode]), + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, + variable = #mqtt_frame_connack{ + return_code = ReturnCode }}), + {ok, State1}; + +process_request(?PUBLISH, Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> + emqtt_pubsub:publish(make_msg(Frame)), + {ok, State}; + +process_request(?PUBLISH, + Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_1}, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + emqtt_pubsub:publish(make_msg(Frame)), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?PUBLISH, + Frame=#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = ?QOS_2}, + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + emqtt_pubsub:publish(make_msg(Frame)), + put({msg, MsgId}, pubrec), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + + {ok, State}; + +process_request(?PUBACK, #mqtt_frame{}, State) -> + %TODO: fixme later + {ok, State}; + +process_request(?PUBREC, #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + %TODO: fixme later + send_frame(Sock, + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?PUBREL, + #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = MsgId}}, + State=#state{socket=Sock}) -> + erase({msg, MsgId}), + send_frame(Sock, + #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?PUBCOMP, #mqtt_frame{ + variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> + %TODO: fixme later + {ok, State}; + +process_request(?SUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{message_id = MessageId, + topic_table = Topics}, + payload = undefined}, + #state{socket=Sock} = State) -> + + [emqtt_pubsub:subscribe({Name, Qos}, self()) || + #mqtt_topic{name=Name, qos=Qos} <- Topics], + + GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], + + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = GrantedQos}}), + + {ok, State}; + +process_request(?UNSUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{message_id = MessageId, + topic_table = Topics }, + payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> + + + [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], + + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK }, + variable = #mqtt_frame_suback{message_id = MessageId }}), + + {ok, State}; + +process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> + %Keep alive timer + KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), + send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), + {ok, State#state{keep_alive=KeepAlive1}}; + +process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> + ?INFO("~s disconnected", [ClientId]), + {stop, State}. + + diff --git a/apps/emqtt/src/emqtt_sup.erl b/apps/emqtt/src/emqtt_sup.erl index 7a0d52b75..de3d053b3 100644 --- a/apps/emqtt/src/emqtt_sup.erl +++ b/apps/emqtt/src/emqtt_sup.erl @@ -60,12 +60,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> %% =================================================================== init([]) -> - {ok, { {one_for_all, 5, 10}, [ - ?CHILD(emqtt_cm, worker), - ?CHILD(emqtt_monitor, worker), - ?CHILD(emqtt_auth, worker), - ?CHILD(emqtt_retained, worker), - ?CHILD(emqtt_pubsub, worker), - ?CHILD(emqtt_registry, worker)]} - }. + {ok, { {one_for_all, 5, 10}, [] } }. diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index 5c45e5868..29d9d865d 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -111,7 +111,7 @@ validate({subscribe, Topic}) when is_binary(Topic) -> valid(words(Topic)); validate({publish, Topic}) when is_binary(Topic) -> Words = words(Topic), - valid(Words) and (not include_wildcard(Words)). + valid(Words) and (not include_wildcard(Topic)). triples(B) when is_binary(B) -> triples(binary_to_list(B), []). @@ -152,5 +152,5 @@ include_wildcard(<<$#, _T/binary>>) -> true; include_wildcard(<<$+, _T/binary>>) -> true; include_wildcard(<<_H, T/binary>>) -> include_wildcard(T). -l2b(L) when is_list(L) -> list_to_binary(L). +l2b(L) -> list_to_binary(L). diff --git a/doc/emqtt.png b/doc/emqtt.png new file mode 100644 index 000000000..df6462e86 Binary files /dev/null and b/doc/emqtt.png differ