Merge branch 'dev' of github.com:emqtt/emqtt into dev

This commit is contained in:
Ery Lee 2014-12-29 10:59:10 +08:00
commit 7ab3194747
15 changed files with 321 additions and 243 deletions

56
CHANGELOG.md Normal file
View File

@ -0,0 +1,56 @@
eMQTT ChangeLog
==================
0.2.0 (2014-12-07)
-------------------
rewrite the project, integrate with esockd, mochiweb
support MQTT 3.1.1
support HTTP to publish message
0.1.5 (2013-01-05)
-------------------
Bugfix: remove QOS_1 match when handle PUBREL request
Bugfix: reverse word in emqtt_topic:words/1 function
0.1.4 (2013-01-04)
-------------------
Bugfix: fix "mosquitto_sub -q 2 ......" bug
Bugfix: fix keep alive bug
0.1.3 (2012-01-04)
-------------------
Feature: support QOS2 PUBREC, PUBREL,PUBCOMP messages
Bugfix: fix emqtt_frame to encode/decoe PUBREC/PUBREL messages
0.1.2 (2012-12-27)
-------------------
Feature: release support like riak
Bugfix: use ?INFO/?ERROR to print log in tcp_listener.erl
0.1.1 (2012-09-24)
-------------------
Feature: use rebar to generate release
Feature: support retained messages
Bugfix: send will msg when network error
0.1.0 (2012-09-21)
-------------------
The first public release.

39
CHANGES
View File

@ -1,39 +0,0 @@
Changes with emqtt 0.1.5 05 Jan 2012
*) Bugfix: remove QOS_1 match when handle PUBREL request
*) Bugfix: reverse word in emqtt_topic:words/1 function
Changes with emqtt 0.1.4 04 Jan 2012
*) Bugfix: fix "mosquitto_sub -q 2 ......" bug
*) Bugfix: fix keep alive bug
Changes with emqtt 0.1.3 04 Jan 2012
*) Feature: support QOS2 PUBREC, PUBREL,PUBCOMP messages
*) Bugfix: fix emqtt_frame to encode/decoe PUBREC/PUBREL messages
Changes with emqtt 0.1.2 27 Dec 2012
*) Feature: release support like riak
*) Bugfix: use ?INFO/?ERROR to print log in tcp_listener.erl
Changes with emqtt 0.1.1 24 Dec 2012
*) Feature: use rebar to generate release
*) Feature: support retained messages
*) Bugfix: send will msg when network error
Changes with emqtt 0.1.0 21 Dec 2012
*) The first public release.

View File

@ -1,6 +1,6 @@
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2014, Feng Lee <feng.lee@slimchat.io> Copyright (c) 2014, Feng Lee <feng@slimchat.io>
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,74 +1,85 @@
emqtt # eMQTT
=====
erlang mqtt broker. eMQTT is a scalable, fault-tolerant and extensible mqtt broker written in Erlang/OTP.
requires eMQTT support MQTT V3.1 Protocol Specification.
========
erlang R15B+ eMQTT requires Erlang R17+.
git client ## Startup in Five Minutes
build ```
======= $ git clone git://github.com/slimpp/emqtt.git
make $ cd emqtt
release $ make && make dist
=======
make generate $ cd rel/emqtt
deloy $ ./bin/emqtt console
===== ```
## Deploy and Start
### start
```
cp -R rel/emqtt $INSTALL_DIR cp -R rel/emqtt $INSTALL_DIR
start cd $INSTALL_DIR/emqtt
======
cd $INSTALL_DRI/emqtt
./bin/emqtt console
or
./bin/emqtt start ./bin/emqtt start
status ```
======
./bin/emqtt_ctl status ### stop
stop
====
```
./bin/emqtt stop ./bin/emqtt stop
logs ```
====
log/* ## Configuration
http api ......
========
curl -v --basic -u user:passwd -d "topic=/abc&message=akakakk&qos=0" -k http://localhost:8883/mqtt/publish ## Admin and Cluster
design ......
=====
https://github.com/slimpp/emqtt/wiki ## HTTP API
author eMQTT support http to publish message.
=====
Ery Lee <ery.lee at gmail dot com> Example:
```
curl -v --basic -u user:passwd -d "topic=/a/b/c&message=hello from http..." -k http://localhost:8883/mqtt/publish
```
license ### URL
======
```
HTTP POST http://host:8883/mqtt/publish
```
### Parameters
Name | Description
-----|-------------
topic | MQTT Topic
message | Text Message
## Design
[Design Wiki](https://github.com/slimpp/emqtt/wiki)
## License
The MIT License (MIT) The MIT License (MIT)
## Author
feng at slimchat.io

View File

@ -18,11 +18,15 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%% %%
-define(PROTOCOL_NAMES, [{3, "MQIsdp"}, {4, "MQTT"}]). -define(CLIENT_ID_MAXLEN, 1024).
-define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]).
-define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MAJOR, 3).
-define(MQTT_PROTO_MINOR, 1). -define(MQTT_PROTO_MINOR, 1).
-define(CLIENT_ID_MAXLEN, 23).
%% frame types %% frame types
-define(CONNECT, 1). -define(CONNECT, 1).
@ -49,6 +53,21 @@
-define(CONNACK_CREDENTIALS, 4). %% bad user name or password -define(CONNACK_CREDENTIALS, 4). %% bad user name or password
-define(CONNACK_AUTH, 5). %% not authorized -define(CONNACK_AUTH, 5). %% not authorized
-record(state, {socket,
conn_name,
await_recv,
connection_state,
conserve,
parse_state,
message_id,
client_id,
clean_sess,
will_msg,
keep_alive,
awaiting_ack,
subtopics,
awaiting_rel}).
-record(mqtt_frame, {fixed, -record(mqtt_frame, {fixed,
variable, variable,

View File

@ -30,7 +30,7 @@
-export([start_link/0, -export([start_link/0,
add/2, add/2,
check/2, check/1, check/2,
delete/1]). delete/1]).
-behavior(gen_server). -behavior(gen_server).
@ -42,9 +42,15 @@
terminate/2, terminate/2,
code_change/3]). code_change/3]).
-define(TAB, ?MODULE).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec check({Usename :: binary(), Password :: binary()}) -> true | false.
check({Username, Password}) ->
execute(check, [Username, Password]).
-spec check(Usename :: binary(), Password :: binary()) -> true | false. -spec check(Usename :: binary(), Password :: binary()) -> true | false.
check(Username, Password) -> check(Username, Password) ->
execute(check, [Username, Password]). execute(check, [Username, Password]).
@ -58,15 +64,15 @@ delete(Username) ->
execute(delete, [Username]). execute(delete, [Username]).
execute(F, Args) -> execute(F, Args) ->
[{_, M}] = ets:lookup(emqtt_auth, mod), [{_, M}] = ets:lookup(?TAB, mod),
apply(M, F, Args). apply(M, F, Args).
init([]) -> init([]) ->
{ok, {Name, Opts}} = application:get_env(auth), {ok, {Name, Opts}} = application:get_env(auth),
AuthMod = authmod(Name), AuthMod = authmod(Name),
ok = AuthMod:init(Opts), ok = AuthMod:init(Opts),
ets:new(emqtt_auth, [named_table, protected]), ets:new(?TAB, [named_table, protected]),
ets:insert(emqtt_quth, {mod, AuthMod}), ets:insert(?TAB, {mod, AuthMod}),
?PRINT("emqtt authmod is ~p", [AuthMod]), ?PRINT("emqtt authmod is ~p", [AuthMod]),
{ok, undefined}. {ok, undefined}.

View File

@ -22,6 +22,8 @@
-module(emqtt_auth_internal). -module(emqtt_auth_internal).
-author('feng@slimchat.io').
-include("emqtt.hrl"). -include("emqtt.hrl").
-export([init/1, -export([init/1,

View File

@ -43,8 +43,6 @@
-include("emqtt_frame.hrl"). -include("emqtt_frame.hrl").
-define(CLIENT_ID_MAXLEN, 23).
-record(state, {socket, -record(state, {socket,
conn_name, conn_name,
await_recv, await_recv,
@ -233,140 +231,6 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
{err, Reason, State} {err, Reason, State}
end. end.
process_request(?CONNECT,
#mqtt_frame{ variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
keep_alive = AlivePeriod,
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
{ReturnCode, State1} =
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
valid_client_id(ClientId)} of
{false, _} ->
{?CONNACK_PROTO_VER, State};
{_, false} ->
{?CONNACK_INVALID_ID, State};
_ ->
case emqtt_auth:check(Username, Password) of
false ->
?ERROR_MSG("MQTT login failed - no credentials"),
{?CONNACK_CREDENTIALS, State};
true ->
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
emqtt_cm:create(ClientId, self()),
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
{?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var),
client_id = ClientId,
keep_alive = KeepAlive}}
end
end,
?INFO("recv conn...:~p", [ReturnCode]),
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
variable = #mqtt_frame_connack{
return_code = ReturnCode }}),
{ok, State1};
process_request(?PUBLISH, Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
emqtt_pubsub:publish(make_msg(Frame)),
{ok, State};
process_request(?PUBLISH,
Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
emqtt_pubsub:publish(make_msg(Frame)),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBLISH,
Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
emqtt_pubsub:publish(make_msg(Frame)),
put({msg, MsgId}, pubrec),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBACK, #mqtt_frame{}, State) ->
%TODO: fixme later
{ok, State};
process_request(?PUBREC, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
%TODO: fixme later
send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBREL,
#mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
erase({msg, MsgId}),
send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBCOMP, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
%TODO: fixme later
{ok, State};
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics},
payload = undefined},
#state{socket=Sock} = State) ->
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
#mqtt_topic{name=Name, qos=Qos} <- Topics],
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = GrantedQos}}),
{ok, State};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics },
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
variable = #mqtt_frame_suback{message_id = MessageId }}),
{ok, State};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
%Keep alive timer
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
{ok, State#state{keep_alive=KeepAlive1}};
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
?INFO("~s disconnected", [ClientId]),
{stop, State}.
next_msg_id(State = #state{ message_id = 16#ffff }) -> next_msg_id(State = #state{ message_id = 16#ffff }) ->
State #state{ message_id = 1 }; State #state{ message_id = 1 };
next_msg_id(State = #state{ message_id = MsgId }) -> next_msg_id(State = #state{ message_id = MsgId }) ->
@ -428,7 +292,7 @@ stop(Reason, State ) ->
{stop, Reason, State}. {stop, Reason, State}.
valid_client_id(ClientId) -> valid_client_id(ClientId) ->
ClientIdLen = length(ClientId), ClientIdLen = size(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) -> handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) ->

View File

@ -161,7 +161,7 @@ parse_utf(Bin, _) ->
parse_utf(Bin). parse_utf(Bin).
parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) -> parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
{binary_to_list(Str), Rest}. {Str, Rest}.
parse_msg(Bin, 0) -> parse_msg(Bin, 0) ->
{undefined, Bin}; {undefined, Bin};

View File

@ -26,6 +26,8 @@
-include("emqtt.hrl"). -include("emqtt.hrl").
-include("emqtt_log.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
-export([handle/1]). -export([handle/1]).
@ -43,8 +45,8 @@ handle(Req) ->
handle('POST', "/mqtt/publish", Req) -> handle('POST', "/mqtt/publish", Req) ->
Params = mochiweb_request:parse_post(Req), Params = mochiweb_request:parse_post(Req),
error_logger:info_msg("~p~n", [Params]), ?INFO("~p~n", [Params]),
Topic = get_value("topic", Params), Topic = list_to_binary(get_value("topic", Params)),
Message = list_to_binary(get_value("message", Params)), Message = list_to_binary(get_value("message", Params)),
emqtt_pubsub:publish(#mqtt_msg { emqtt_pubsub:publish(#mqtt_msg {
retain = 0, retain = 0,
@ -66,12 +68,9 @@ authorized(Req) ->
undefined -> undefined ->
false; false;
"Basic " ++ BasicAuth -> "Basic " ++ BasicAuth ->
{Username, Password} = user_passwd(BasicAuth), emqtt_auth:check(user_passwd(BasicAuth))
emqtt_auth:check(Username, Password)
end. end.
user_passwd(BasicAuth) -> user_passwd(BasicAuth) ->
[U, P] = binary:split(base64:decode(BasicAuth), <<":">>), list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
{binary_to_list(U), binary_to_list(P)}.

View File

@ -24,7 +24,7 @@
-author('feng@slimchat.io'). -author('feng@slimchat.io').
-export([tcp_name/3, tcp_host/1, getaddr/2, port_to_listeners/1]). -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
-export([connection_string/2]). -export([connection_string/2]).

View File

@ -0,0 +1,167 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_protocol).
-include("emqtt.hrl").
-include("emqtt_log.hrl").
-include("emqtt_frame.hrl").
-export([process_request/3]).
process_request(?CONNECT,
#mqtt_frame{ variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
keep_alive = AlivePeriod,
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
{ReturnCode, State1} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
valid_client_id(ClientId)} of
{false, _} ->
{?CONNACK_PROTO_VER, State};
{_, false} ->
{?CONNACK_INVALID_ID, State};
_ ->
case emqtt_auth:check(Username, Password) of
false ->
?ERROR_MSG("MQTT login failed - no credentials"),
{?CONNACK_CREDENTIALS, State};
true ->
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
emqtt_cm:create(ClientId, self()),
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
{?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var),
client_id = ClientId,
keep_alive = KeepAlive}}
end
end,
?INFO("recv conn...:~p", [ReturnCode]),
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
variable = #mqtt_frame_connack{
return_code = ReturnCode }}),
{ok, State1};
process_request(?PUBLISH, Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
emqtt_pubsub:publish(make_msg(Frame)),
{ok, State};
process_request(?PUBLISH,
Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
emqtt_pubsub:publish(make_msg(Frame)),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBLISH,
Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
emqtt_pubsub:publish(make_msg(Frame)),
put({msg, MsgId}, pubrec),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBACK, #mqtt_frame{}, State) ->
%TODO: fixme later
{ok, State};
process_request(?PUBREC, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
%TODO: fixme later
send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBREL,
#mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) ->
erase({msg, MsgId}),
send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?PUBCOMP, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
%TODO: fixme later
{ok, State};
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics},
payload = undefined},
#state{socket=Sock} = State) ->
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
#mqtt_topic{name=Name, qos=Qos} <- Topics],
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = GrantedQos}}),
{ok, State};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics },
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
variable = #mqtt_frame_suback{message_id = MessageId }}),
{ok, State};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
%Keep alive timer
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
{ok, State#state{keep_alive=KeepAlive1}};
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
?INFO("~s disconnected", [ClientId]),
{stop, State}.

View File

@ -60,12 +60,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
%% =================================================================== %% ===================================================================
init([]) -> init([]) ->
{ok, { {one_for_all, 5, 10}, [ {ok, { {one_for_all, 5, 10}, [] } }.
?CHILD(emqtt_cm, worker),
?CHILD(emqtt_monitor, worker),
?CHILD(emqtt_auth, worker),
?CHILD(emqtt_retained, worker),
?CHILD(emqtt_pubsub, worker),
?CHILD(emqtt_registry, worker)]}
}.

View File

@ -111,7 +111,7 @@ validate({subscribe, Topic}) when is_binary(Topic) ->
valid(words(Topic)); valid(words(Topic));
validate({publish, Topic}) when is_binary(Topic) -> validate({publish, Topic}) when is_binary(Topic) ->
Words = words(Topic), Words = words(Topic),
valid(Words) and (not include_wildcard(Words)). valid(Words) and (not include_wildcard(Topic)).
triples(B) when is_binary(B) -> triples(B) when is_binary(B) ->
triples(binary_to_list(B), []). triples(binary_to_list(B), []).
@ -152,5 +152,5 @@ include_wildcard(<<$#, _T/binary>>) -> true;
include_wildcard(<<$+, _T/binary>>) -> true; include_wildcard(<<$+, _T/binary>>) -> true;
include_wildcard(<<_H, T/binary>>) -> include_wildcard(T). include_wildcard(<<_H, T/binary>>) -> include_wildcard(T).
l2b(L) when is_list(L) -> list_to_binary(L). l2b(L) -> list_to_binary(L).

BIN
doc/emqtt.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.7 KiB