rename project name from 'emqtt' to 'emqttd'

This commit is contained in:
Ery Lee 2015-03-10 20:09:13 +08:00
parent 83c376ad83
commit aca536cd8a
77 changed files with 1799 additions and 1527 deletions

2
.gitignore vendored
View File

@ -6,7 +6,7 @@ deps
*.plt
erl_crash.dump
ebin
rel/emqtt
rel/emqttd
.concrete/DEV_MODE
.rebar
test/ebin/*.beam

View File

@ -2,8 +2,14 @@
eMQTT ChangeLog
==================
v0.5.0-alpha (2015-03-20)
-------------------------
rename 'eMQTT' to 'eMQTTD'
v0.4.0-alpha (2015-03-10)
------------------------
-------------------------
Support [$SYS Topics of Broker](https://github.com/emqtt/emqtt/wiki/$SYS-Topics-of-Broker) Now!

View File

@ -1,128 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_app).
-author('feng@emqtt.io').
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
-define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)).
%% ===================================================================
%% Application callbacks
%% ===================================================================
%%
%% @spec start(atom(), list()) -> {ok, pid()}
%%
start(_StartType, _StartArgs) ->
print_banner(),
{ok, Sup} = emqtt_sup:start_link(),
start_servers(Sup),
{ok, Listeners} = application:get_env(listen),
emqtt:open(Listeners),
register(emqtt, self()),
print_vsn(),
{ok, Sup}.
print_banner() ->
?PRINT("starting emqtt on node '~s'~n", [node()]).
print_vsn() ->
{ok, Vsn} = application:get_key(vsn),
{ok, Desc} = application:get_key(description),
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_servers(Sup) ->
{ok, SessOpts} = application:get_env(session),
{ok, RetainOpts} = application:get_env(retain),
{ok, BrokerOpts} = application:get_env(broker),
{ok, MetricOpts} = application:get_env(metrics),
lists:foreach(
fun({Name, F}) when is_function(F) ->
?PRINT("~s is starting...", [Name]),
F(),
?PRINT_MSG("[done]~n");
({Name, Server}) ->
?PRINT("~s is starting...", [Name]),
start_child(Sup, Server),
?PRINT_MSG("[done]~n");
({Name, Server, Opts}) ->
?PRINT("~s is starting...", [ Name]),
start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n")
end,
[{"emqtt config", emqtt_config},
{"emqtt server", emqtt_server, RetainOpts},
{"emqtt client manager", emqtt_cm},
{"emqtt session manager", emqtt_sm},
{"emqtt session supervisor", {supervisor, emqtt_session_sup}, SessOpts},
{"emqtt auth", emqtt_auth},
{"emqtt pubsub", emqtt_pubsub},
{"emqtt router", emqtt_router},
{"emqtt broker", emqtt_broker, BrokerOpts},
{"emqtt metrics", emqtt_metrics, MetricOpts},
{"emqtt monitor", emqtt_monitor}
]).
start_child(Sup, {supervisor, Name}) ->
supervisor:start_child(Sup, supervisor_spec(Name));
start_child(Sup, Name) when is_atom(Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)).
start_child(Sup, {supervisor, Name}, Opts) ->
supervisor:start_child(Sup, supervisor_spec(Name, Opts));
start_child(Sup, Name, Opts) when is_atom(Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)).
%%TODO: refactor...
supervisor_spec(Name) ->
{Name,
{Name, start_link, []},
permanent, infinity, supervisor, [Name]}.
supervisor_spec(Name, Opts) ->
{Name,
{Name, start_link, [Opts]},
permanent, infinity, supervisor, [Name]}.
worker_spec(Name) ->
{Name,
{Name, start_link, []},
permanent, 5000, worker, [Name]}.
worker_spec(Name, Opts) ->
{Name,
{Name, start_link, [Opts]},
permanent, 5000, worker, [Name]}.
%%
%% @spec stop(atom) -> 'ok'
%%
stop(_State) ->
ok.

View File

@ -1,93 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_auth).
-author('feng@emqtt.io').
-include("emqtt.hrl").
-export([start_link/0,
add/2,
check/1, check/2,
delete/1]).
-behavior(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(TAB, ?MODULE).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec check({Usename :: binary(), Password :: binary()}) -> true | false.
check({Username, Password}) ->
execute(check, [Username, Password]).
-spec check(Usename :: binary(), Password :: binary()) -> true | false.
check(Username, Password) ->
execute(check, [Username, Password]).
-spec add(Usename :: binary(), Password :: binary()) -> ok.
add(Username, Password) ->
execute(add, [Username, Password]).
-spec delete(Username :: binary()) -> ok.
delete(Username) ->
execute(delete, [Username]).
execute(F, Args) ->
[{_, M}] = ets:lookup(?TAB, mod),
apply(M, F, Args).
init([]) ->
{ok, {Name, Opts}} = application:get_env(auth),
AuthMod = authmod(Name),
ok = AuthMod:init(Opts),
ets:new(?TAB, [named_table, protected]),
ets:insert(?TAB, {mod, AuthMod}),
{ok, undefined}.
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqtt_auth_", Name])).
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1,39 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_auth_anonymous).
-author('feng@emqtt.io').
-export([init/1,
add/2,
check/2,
delete/1]).
init(_Opts) -> ok.
check(_, _) -> true.
add(_, _) -> ok.
delete(_Username) -> ok.

View File

@ -1,62 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_auth_internal).
-author('feng@emqtt.io').
-include("emqtt.hrl").
-export([init/1,
add/2,
check/2,
delete/1]).
init(_Opts) ->
mnesia:create_table(mqtt_user, [
{ram_copies, [node()]},
{attributes, record_info(fields, mqtt_user)}]),
mnesia:add_table_copy(mqtt_user, node(), ram_copies),
ok.
check(undefined, _) -> false;
check(_, undefined) -> false;
check(Username, Password) when is_binary(Username), is_binary(Password) ->
PasswdHash = crypto:hash(md5, Password),
case mnesia:dirty_read(mqtt_user, Username) of
[#mqtt_user{passwdhash=PasswdHash}] -> true;
_ -> false
end.
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
mnesia:dirty_write(
#mqtt_user{
username=Username,
passwdhash=crypto:hash(md5, Password)
}
).
delete(Username) when is_binary(Username) ->
mnesia:dirty_delete(mqtt_user, Username).

View File

@ -1,51 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_bridge_sup).
-author('feng@emqtt.io').
-behavior(supervisor).
-export([start_link/0, start_bridge/2, stop_bridge/1, init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%TODO: FIXME LATER.
start_bridge(Name, Opts) when is_atom(Name) ->
supervisor:start_child(?MODULE, {{bridge, Name},
{eqmtt_bridge, start_link, [Opts]},
transient, 16#fffff, worker, [emqtt_bridge]}).
stop_bridge(Name) ->
ChildId = {bridge, Name},
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
{error, Reason} ->
{error, Reason}
end.
init([]) ->
{ok, {{one_for_one, 10, 1000}, []}}.

View File

@ -1,82 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_config).
-export([lookup/1]).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%TODO: fix later...
lookup(Key) -> {ok, Key}.
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(_Args) ->
ets:new(?MODULE, [set, protected, named_table]),
%%TODO: Load application config.
{ok, none}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

View File

@ -1,93 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_ctl).
-author('feng@emqtt.io').
-include("emqtt.hrl").
-define(PRINT_MSG(Msg),
io:format(Msg)).
-define(PRINT(Format, Args),
io:format(Format, Args)).
-export([status/1,
cluster/1,
useradd/1,
userdel/1]).
status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
case lists:keysearch(emqtt, 1, application:which_applications()) of
false ->
?PRINT_MSG("emqtt is not running~n");
{value,_Version} ->
?PRINT_MSG("emqtt is running~n")
end.
cluster([]) ->
Nodes = [node()|nodes()],
?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster([SNode]) ->
Node = node_name(SNode),
case net_adm:ping(Node) of
pong ->
application:stop(emqtt),
application:stop(esockd),
mnesia:stop(),
mnesia:start(),
mnesia:change_config(extra_db_nodes, [Node]),
application:start(esockd),
application:start(emqtt),
?PRINT("cluster with ~p successfully.~n", [Node]);
pang ->
?PRINT("failed to connect to ~p~n", [Node])
end.
useradd([Username, Password]) ->
?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]).
userdel([Username]) ->
?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]).
node_name(SNode) ->
SNode1 =
case string:tokens(SNode, "@") of
[_Node, _Server] ->
SNode;
_ ->
case net_kernel:longnames() of
true ->
SNode ++ "@" ++ inet_db:gethostname() ++
"." ++ inet_db:res_option(domain);
false ->
SNode ++ "@" ++ inet_db:gethostname();
_ ->
SNode
end
end,
list_to_atom(SNode1).

View File

@ -1,40 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_db).
-author('feng@emqtt.io').
-export([init/0, stop/0]).
init() ->
case mnesia:system_info(extra_db_nodes) of
[] -> mnesia:create_schema([node()]);
_ -> ok
end,
ok = mnesia:start(),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
stop() ->
mnesia:stop().

View File

@ -1,94 +0,0 @@
%%------------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_http).
-author('feng@emqtt.io').
-include("emqtt.hrl").
-include("emqtt_packet.hrl").
-import(proplists, [get_value/2, get_value/3]).
-export([handle/1]).
handle(Req) ->
case authorized(Req) of
true ->
Path = Req:get(path),
Method = Req:get(method),
handle(Method, Path, Req);
false ->
error_logger:info_msg("Fobbidden"),
Req:respond({401, [], <<"Fobbiden">>})
end.
handle('POST', "/mqtt/publish", Req) ->
Params = mochiweb_request:parse_post(Req),
lager:info("HTTP Publish: ~p~n", [Params]),
Qos = int(get_value("qos", Params, "0")),
Retain = bool(get_value("retain", Params, "0")),
Topic = list_to_binary(get_value("topic", Params)),
Message = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} ->
emqtt_router:route(
#mqtt_message { qos = Qos,
retain = Retain,
topic = Topic,
payload = Message }),
Req:ok({"text/plan", <<"ok\n">>});
{false, _} ->
Req:respond({400, [], <<"Bad QoS">>});
{_, false} ->
Req:respond({400, [], <<"Bad Topic">>})
end;
handle(_Method, _Path, Req) ->
Req:not_found().
%%------------------------------------------------------------------------------
%% basic authorization
%%------------------------------------------------------------------------------
authorized(Req) ->
case mochiweb_request:get_header_value("Authorization", Req) of
undefined ->
false;
"Basic " ++ BasicAuth ->
emqtt_auth:check(user_passwd(BasicAuth))
end.
user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
validate(topic, Topic) ->
emqtt_topic:validate({name, Topic}).
int(S) -> list_to_integer(S).
bool("0") -> false;
bool("1") -> true.

View File

@ -1,133 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_message).
-author('feng@emqtt.io').
-include("emqtt.hrl").
-include("emqtt_packet.hrl").
-export([from_packet/1, to_packet/1]).
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
-export([dump/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec( from_packet( mqtt_packet() ) -> mqtt_message() | undefined ).
-spec( to_packet( mqtt_message() ) -> mqtt_packet() ).
-sepc( set_flag(atom(), mqtt_message() ) -> mqtt_message().
-sepc( unset_flag(atom(), mqtt_message() ) -> mqtt_message().
-endif.
%%----------------------------------------------------------------------------
%%
%% @doc message from packet
%%
from_packet(#mqtt_packet{ header = #mqtt_packet_header{ type = ?PUBLISH,
retain = Retain,
qos = Qos,
dup = Dup },
variable = #mqtt_packet_publish{ topic_name = Topic,
packet_id = PacketId },
payload = Payload }) ->
#mqtt_message{ msgid = PacketId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload };
from_packet(#mqtt_packet_connect{ will_flag = false }) ->
undefined;
from_packet(#mqtt_packet_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_message{ retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.
%%
%% @doc message to packet
%%
to_packet(#mqtt_message{ msgid = MsgId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload }) ->
PacketId = if
Qos =:= ?QOS_0 -> undefined;
true -> MsgId
end,
#mqtt_packet{ header = #mqtt_packet_header { type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup },
variable = #mqtt_packet_publish { topic_name = Topic,
packet_id = PacketId },
payload = Payload }.
%%
%% @doc set dup, retain flag
%%
set_flag(Msg) ->
Msg#mqtt_message{dup = true, retain = true}.
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
Msg#mqtt_message{dup = true};
set_flag(retain, Msg = #mqtt_message{retain = false}) ->
Msg#mqtt_message{retain = true};
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
unset_flag(Msg) ->
Msg#mqtt_message{dup = false, retain = false}.
unset_flag(dup, Msg = #mqtt_message{dup = true}) ->
Msg#mqtt_message{dup = false};
unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
Msg#mqtt_message{retain = false};
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%%
%% @doc dump message
%%
dump(#mqtt_message{msgid= MsgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic}) ->
io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
[ MsgId, Qos, Retain, Dup, Topic ]).

View File

@ -1,115 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_monitor).
-author('feng@emqtt.io').
-behavior(gen_server).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {ok}).
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]),
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(Request, _From, State) ->
lager:error("unexpected request: ~p", [Request]),
{reply, {error, unexpected_request}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(Msg, State) ->
lager:error("unexpected msg: ~p", [Msg]),
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({monitor, GcPid, long_gc, Info}, State) ->
lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid,
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
{noreply, State};
handle_info({monitor, GcPid, large_heap, Info}, State) ->
lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid,
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
{noreply, State};
handle_info({monitor, SusPid, busy_port, Port}, State) ->
lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid,
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]),
{noreply, State};
handle_info(Info, State) ->
lager:error("unexpected info: ~p", [Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1 +0,0 @@
-module(emqtt_plugin).

View File

@ -1,74 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_queue).
-include("emqtt_packet.hrl").
-export([new/1, new/2, in/3, all/1, clear/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mqtt_queue() :: #mqtt_queue_wrapper{}).
-spec(new(non_neg_intger()) -> mqtt_queue()).
-spec(in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue()).
-spec(all(mqtt_queue()) -> list()).
-spec(clear(mqtt_queue()) -> mqtt_queue()).
-endif.
%%----------------------------------------------------------------------------
-define(DEFAULT_MAX_LEN, 1000).
-record(mqtt_queue_wrapper, { queue = queue:new(), max_len = ?DEFAULT_MAX_LEN, store_qos0 = false }).
new(MaxLen) -> #mqtt_queue_wrapper{ max_len = MaxLen }.
new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{ max_len = MaxLen, store_qos0 = StoreQos0 }.
in(ClientId, Message = #mqtt_message{qos = Qos},
Wrapper = #mqtt_queue_wrapper{ queue = Queue, max_len = MaxLen}) ->
case queue:len(Queue) < MaxLen of
true ->
Wrapper#mqtt_queue_wrapper{ queue = queue:in(Message, Queue) };
false -> % full
if
Qos =:= ?QOS_0 ->
lager:warning("Queue ~s drop qos0 message: ~p", [ClientId, Message]),
Wrapper;
true ->
{{value, Msg}, Queue1} = queue:drop(Queue),
lager:warning("Queue ~s drop message: ~p", [ClientId, Msg]),
Wrapper#mqtt_queue_wrapper{ queue = Queue1 }
end
end.
all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue).
clear(Queue) -> Queue#mqtt_queue_wrapper{ queue = queue:new() }.

View File

@ -1,41 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_queue_sup).
-author('feng@emqtt.io').
-behavior(supervisor).
-export([start_link/0, start_queue/0, init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_queue() ->
supervisor:start_child(?MODULE, []).
init([]) ->
{ok, {{simple_one_for_one, 0, 1},
[{queue, {emqtt_queue, start_link, []},
transient, 10000, worker, [emqtt_queue]}]}}.

View File

@ -1,57 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_session_sup).
-author('feng@emqtt.io').
-behavior(supervisor).
-export([start_link/1, start_session/2]).
-export([init/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/1 :: (list(tuple())) -> {ok, pid()}).
-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()}).
-endif.
%%----------------------------------------------------------------------------
start_link(SessOpts) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [SessOpts]).
start_session(ClientId, ClientPid) ->
supervisor:start_child(?MODULE, [ClientId, ClientPid]).
init([SessOpts]) ->
{ok, {{simple_one_for_one, 0, 1},
[{session, {emqtt_session, start_link, [SessOpts]},
transient, 10000, worker, [emqtt_session]}]}}.

View File

@ -1,64 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_sup).
-author('feng@emqtt.io').
-include("emqtt.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0,
start_child/1,
start_child/2]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_all, 5, 10}, [] } }.

View File

@ -1,26 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_throttle).

View File

@ -1,23 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_vm).

View File

@ -59,6 +59,13 @@
-type mqtt_session() :: #mqtt_session{}.
%%------------------------------------------------------------------------------
%% MQTT Retained Message
%%------------------------------------------------------------------------------
-record(mqtt_retained, {topic, qos, payload}).
-type mqtt_retained() :: #mqtt_retained{}.
%%------------------------------------------------------------------------------
%% MQTT User Management
%%------------------------------------------------------------------------------

View File

@ -1,12 +1,12 @@
{application, emqtt,
{application, emqttd,
[
{description, "Erlang MQTT Broker"},
{vsn, "0.4.0"},
{vsn, "0.5.0"},
{modules, []},
{registered, []},
{applications, [kernel,
stdlib]},
{mod, {emqtt_app, []}},
{mod, {emqttd_app, []}},
{env, []}
]}.

View File

@ -20,11 +20,13 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt main module.
%%% emqttd main module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt).
-module(emqttd).
-author('feng@emqtt.io').
-export([start/0, open/1]).
@ -36,22 +38,35 @@
{nodelay, true}
]).
-type listener() :: {atom(), inet:port_number(), [esockd:option()]}.
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd application.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start() -> ok | {error, any()}.
start() ->
application:start(emqtt).
application:start(emqttd).
-spec open([listener()] | listener()) -> any().
open(Listeners) when is_list(Listeners) ->
[open(Listener) || Listener <- Listeners];
%% open mqtt port
open({mqtt, Port, Options}) ->
MFArgs = {emqtt_client, start_link, []},
esockd:open(mqtt, Port, emqtt_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs);
MFArgs = {emqttd_client, start_link, []},
esockd:open(mqtt, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs);
%% open mqtt(SSL) port
open({mqtts, Port, Options}) ->
MFArgs = {emqtt_client, start_link, []},
esockd:open(mqtts, Port, emqtt_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs);
MFArgs = {emqttd_client, start_link, []},
esockd:open(mqtts, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs);
%% open http port
open({http, Port, Options}) ->
MFArgs = {emqtt_http, handle, []},
MFArgs = {emqttd_http, handle, []},
mochiweb:start_http(Port, Options, MFArgs).

View File

@ -0,0 +1,155 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd application.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_app).
-author('feng@emqtt.io').
-define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%%%=============================================================================
%%% Application callbacks
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application is started using
%% application:start/[1,2], and should start the processes of the
%% application. If the application is structured according to the OTP
%% design principles as a supervision tree, this means starting the
%% top supervisor of the tree.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when
StartType :: normal | {takeover, node()} | {failover, node()},
StartArgs :: term(),
State :: term(),
Reason :: term().
start(_StartType, _StartArgs) ->
print_banner(),
{ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup),
ok = emqttd_mnesia:wait(),
{ok, Listeners} = application:get_env(listen),
emqttd:open(Listeners),
register(emqtt, self()),
print_vsn(),
{ok, Sup}.
print_banner() ->
?PRINT("starting emqttd on node '~s'~n", [node()]).
print_vsn() ->
{ok, Vsn} = application:get_key(vsn),
{ok, Desc} = application:get_key(description),
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_servers(Sup) ->
{ok, SessOpts} = application:get_env(session),
{ok, RetainOpts} = application:get_env(retain),
{ok, BrokerOpts} = application:get_env(broker),
{ok, MetricOpts} = application:get_env(metrics),
lists:foreach(
fun({Name, F}) when is_function(F) ->
?PRINT("~s is starting...", [Name]),
F(),
?PRINT_MSG("[done]~n");
({Name, Server}) ->
?PRINT("~s is starting...", [Name]),
start_child(Sup, Server),
?PRINT_MSG("[done]~n");
({Name, Server, Opts}) ->
?PRINT("~s is starting...", [ Name]),
start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n")
end,
[{"emqttd config", emqttd_config},
{"emqttd server", emqttd_server, RetainOpts},
{"emqttd client manager", emqttd_cm},
{"emqttd session manager", emqttd_sm},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
{"emqttd auth", emqttd_auth},
{"emqttd pubsub", emqttd_pubsub},
{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker, BrokerOpts},
{"emqttd metrics", emqttd_metrics, MetricOpts},
{"emqttd monitor", emqttd_monitor}
]).
start_child(Sup, {supervisor, Name}) ->
supervisor:start_child(Sup, supervisor_spec(Name));
start_child(Sup, Name) when is_atom(Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)).
start_child(Sup, {supervisor, Name}, Opts) ->
supervisor:start_child(Sup, supervisor_spec(Name, Opts));
start_child(Sup, Name, Opts) when is_atom(Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)).
%%TODO: refactor...
supervisor_spec(Name) ->
{Name,
{Name, start_link, []},
permanent, infinity, supervisor, [Name]}.
supervisor_spec(Name, Opts) ->
{Name,
{Name, start_link, [Opts]},
permanent, infinity, supervisor, [Name]}.
worker_spec(Name) ->
{Name,
{Name, start_link, []},
permanent, 5000, worker, [Name]}.
worker_spec(Name, Opts) ->
{Name,
{Name, start_link, [Opts]},
permanent, 5000, worker, [Name]}.
%%------------------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application has stopped. It
%% is intended to be the opposite of Module:start/2 and should do
%% any necessary cleaning up. The return value is ignored.
%%
%% @end
%%------------------------------------------------------------------------------
-spec stop(State :: term()) -> term().
stop(_State) ->
ok.

View File

@ -0,0 +1,97 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-export([start_link/0,
add/2,
check/1, check/2,
delete/1]).
-behavior(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(TAB, ?MODULE).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec check({Usename :: binary(), Password :: binary()}) -> true | false.
check({Username, Password}) ->
execute(check, [Username, Password]).
-spec check(Usename :: binary(), Password :: binary()) -> true | false.
check(Username, Password) ->
execute(check, [Username, Password]).
-spec add(Usename :: binary(), Password :: binary()) -> ok.
add(Username, Password) ->
execute(add, [Username, Password]).
-spec delete(Username :: binary()) -> ok.
delete(Username) ->
execute(delete, [Username]).
execute(F, Args) ->
[{_, M}] = ets:lookup(?TAB, mod),
apply(M, F, Args).
init([]) ->
{ok, {Name, Opts}} = application:get_env(auth),
AuthMod = authmod(Name),
ok = AuthMod:init(Opts),
ets:new(?TAB, [named_table, protected]),
ets:insert(?TAB, {mod, AuthMod}),
{ok, undefined}.
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_auth_", Name])).
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,40 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd anonymous authentication.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_anonymous).
-author('feng@emqtt.io').
-export([init/1, add/2, check/2, delete/1]).
init(_Opts) -> ok.
check(_, _) -> true.
add(_, _) -> ok.
delete(_Username) -> ok.

View File

@ -0,0 +1,65 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd internal authentication.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_internal).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-export([init/1, add/2, check/2, delete/1]).
-define(USER_TAB, mqtt_user).
init(_Opts) ->
mnesia:create_table(?USER_TAB, [
{ram_copies, [node()]},
{attributes, record_info(fields, mqtt_user)}]),
mnesia:add_table_copy(?USER_TAB, node(), ram_copies),
ok.
check(undefined, _) -> false;
check(_, undefined) -> false;
check(Username, Password) when is_binary(Username), is_binary(Password) ->
PasswdHash = crypto:hash(md5, Password),
case mnesia:dirty_read(?USER_TAB, Username) of
[#mqtt_user{passwdhash=PasswdHash}] -> true;
_ -> false
end.
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
mnesia:dirty_write(
#mqtt_user{
username=Username,
passwdhash=crypto:hash(md5, Password)
}
).
delete(Username) when is_binary(Username) ->
mnesia:dirty_delete(?USER_TAB, Username).

View File

@ -0,0 +1,90 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd bridge.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_bridge).
-author('feng@emqtt.io').
-behaviour(gen_server).
-include("emqttd_packet.hrl").
%% API Function Exports
-export([start_link/2]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {node, local_topic, status = running}).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link(Node, LocalTopic) ->
gen_server:start_link(?MODULE, [Node, LocalTopic], []).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Node, LocalTopic]) ->
emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()),
%%TODO: monitor nodes...
{ok, #state{node = Node, local_topic = LocalTopic}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({nodedown, Node}, State = #state{node = Node}) ->
%%....
{noreply, State#state{status = down}};
handle_info({dispatch, {_From, Msg}}, State = #state{node = Node}) ->
%%TODO: CAST
rpc:call(Node, emqttd_router, route, [Msg]),
{noreply, State};
handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================

View File

@ -0,0 +1,97 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd bridge supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_bridge_sup).
-author('feng@emqtt.io').
-behavior(supervisor).
-export([start_link/0, start_bridge/2, stop_bridge/2]).
-export([init/1]).
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start bridge supervisor.
%%
%% @end
%%------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%------------------------------------------------------------------------------
%% @doc
%% Start a bridge.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}.
start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) ->
%%TODO: mv this code to emqttd_bridge???
case net_kernel:connect_node(Node) of
true ->
supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic));
false ->
{error, {cannot_connect, Node}}
end.
%%------------------------------------------------------------------------------
%% @doc
%% Stop a bridge.
%%
%% @end
%%------------------------------------------------------------------------------
-spec stop_bridge(atom(), binary()) -> {ok, pid()} | ok.
stop_bridge(Node, LocalTopic) ->
ChildId = bridge_id(Node, LocalTopic),
case supervisor:terminate_child(ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
{error, Reason} ->
{error, Reason}
end.
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([]) ->
{ok, {{one_for_one, 10, 100}, []}}.
bridge_id(Node, LocalTopic) ->
{bridge, Node, LocalTopic}.
bridge_spec(Node, LocalTopic) ->
ChildId = bridge_id(Node, LocalTopic),
{ChildId, {emqttd_bridge, start_link, [Node, LocalTopic]},
transient, 10000, worker, [emqttd_bridge]}.

View File

@ -20,21 +20,21 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt broker.
%%% emqttd broker.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_broker).
-module(emqttd_broker).
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
-include("emqtt_systop.hrl").
-include("emqttd_systop.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(BROKER_TAB, ?MODULE).
-define(BROKER_TAB, mqtt_broker).
%% API Function Exports
-export([start_link/1]).
@ -56,7 +56,7 @@
%%------------------------------------------------------------------------------
%% @doc
%% Start emqtt broker.
%% Start emqttd broker.
%%
%% @end
%%------------------------------------------------------------------------------
@ -72,7 +72,7 @@ start_link(Options) ->
%%------------------------------------------------------------------------------
-spec version() -> string().
version() ->
{ok, Version} = application:get_key(emqtt, vsn), Version.
{ok, Version} = application:get_key(emqttd, vsn), Version.
%%------------------------------------------------------------------------------
%% @doc
@ -82,7 +82,7 @@ version() ->
%%------------------------------------------------------------------------------
-spec sysdescr() -> string().
sysdescr() ->
{ok, Descr} = application:get_key(emqtt, description), Descr.
{ok, Descr} = application:get_key(emqttd, description), Descr.
%%------------------------------------------------------------------------------
%% @doc
@ -191,15 +191,15 @@ systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
create(Topic) ->
emqtt_pubsub:create(Topic).
emqttd_pubsub:create(Topic).
retain(Topic, Payload) when is_binary(Payload) ->
emqtt_router:route(#mqtt_message{retain = true,
topic = Topic,
payload = Payload}).
emqttd_router:route(#mqtt_message{retain = true,
topic = Topic,
payload = Payload}).
publish(Topic, Payload) when is_binary(Payload) ->
emqtt_router:route(#mqtt_message{topic = Topic,
emqttd_router:route(#mqtt_message{topic = Topic,
payload = Payload}).
uptime(#state{started_at = Ts}) ->

View File

@ -1,25 +1,30 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_client).
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_client).
-author('feng@emqtt.io').
@ -34,9 +39,9 @@
code_change/3,
terminate/2]).
-include("emqtt.hrl").
-include("emqttd.hrl").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
%%Client State...
-record(state, {transport,
@ -60,8 +65,8 @@ info(Pid) ->
init(SockArgs = {Transport, Sock, _SockFun}) ->
%transform if ssl.
{ok, NewSock} = esockd_connection:accept(SockArgs),
{ok, Peername} = emqtt_net:peer_string(Sock),
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
{ok, Peername} = emqttd_net:peer_string(Sock),
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
lager:info("Connect from ~s", [ConnStr]),
State = control_throttle(#state{transport = Transport,
socket = NewSock,
@ -70,14 +75,14 @@ init(SockArgs = {Transport, Sock, _SockFun}) ->
await_recv = false,
conn_state = running,
conserve = false,
parse_state = emqtt_parser:init(),
proto_state = emqtt_protocol:init(Transport, NewSock, Peername)}),
parse_state = emqttd_parser:init(),
proto_state = emqttd_protocol:init(Transport, NewSock, Peername)}),
gen_server:enter_loop(?MODULE, [], State, 10000).
%%TODO: Not enough...
handle_call(info, _From, State = #state{conn_name=ConnName,
proto_state = ProtoState}) ->
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
{reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
@ -92,18 +97,18 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
conn_name=ConnName}) ->
%% TODO: to...
%% need transfer data???
%% emqtt_client:transfer(NewPid, Data),
%% emqttd_client:transfer(NewPid, Data),
lager:error("Shutdown for duplicate clientid: ~s, conn:~s",
[emqtt_protocol:client_id(ProtoState), ConnName]),
[emqttd_protocol:client_id(ProtoState), ConnName]),
stop({shutdown, duplicate_id}, State);
%%TODO: ok??
handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqtt_protocol:send({From, Message}, ProtoState),
{ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqtt_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) ->
@ -111,7 +116,7 @@ handle_info({inet_reply, _Ref, ok}, State) ->
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) ->
lager:debug("RECV from ~s: ~p", [PeerName, Data]),
emqtt_metrics:inc('bytes/received', size(Data)),
emqttd_metrics:inc('bytes/received', size(Data)),
process_received_bytes(Data,
control_throttle(State #state{await_recv = false}));
@ -124,11 +129,11 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = Pee
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]),
KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
{noreply, State#state{ keepalive = KeepAlive }};
handle_info({keepalive, timeout}, State = #state{keepalive = KeepAlive}) ->
case emqtt_keepalive:resume(KeepAlive) of
case emqttd_keepalive:resume(KeepAlive) of
timeout ->
lager:info("Client ~s: Keepalive Timeout!", [State#state.peer_name]),
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
@ -143,11 +148,11 @@ handle_info(Info, State = #state{peer_name = PeerName}) ->
terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) ->
lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]),
emqtt_keepalive:cancel(KeepAlive),
emqttd_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of
{undefined, _} -> ok;
{_, {shutdown, Error}} ->
emqtt_protocol:shutdown(Error, ProtoState);
emqttd_protocol:shutdown(Error, ProtoState);
{_, _} ->
ok
end,
@ -165,16 +170,16 @@ process_received_bytes(<<>>, State) ->
process_received_bytes(Bytes, State = #state{parse_state = ParseState,
proto_state = ProtoState,
conn_name = ConnStr}) ->
case emqtt_parser:parse(Bytes, ParseState) of
case emqttd_parser:parse(Bytes, ParseState) of
{more, ParseState1} ->
{noreply,
control_throttle(State #state{parse_state = ParseState1}),
hibernate};
{ok, Packet, Rest} ->
received_stats(Packet),
case emqtt_protocol:received(Packet, ProtoState) of
case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(),
process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(),
proto_state = ProtoState1});
{error, Error} ->
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
@ -214,21 +219,21 @@ stop(Reason, State ) ->
{stop, Reason, State}.
received_stats(?PACKET(Type)) ->
emqtt_metrics:inc('packets/received'),
emqttd_metrics:inc('packets/received'),
inc(Type).
inc(?CONNECT) ->
emqtt_metrics:inc('packets/connect');
emqttd_metrics:inc('packets/connect');
inc(?PUBLISH) ->
emqtt_metrics:inc('messages/received'),
emqtt_metrics:inc('packets/publish/received');
emqttd_metrics:inc('messages/received'),
emqttd_metrics:inc('packets/publish/received');
inc(?SUBSCRIBE) ->
emqtt_metrics:inc('packets/subscribe');
emqttd_metrics:inc('packets/subscribe');
inc(?UNSUBSCRIBE) ->
emqtt_metrics:inc('packets/unsubscribe');
emqttd_metrics:inc('packets/unsubscribe');
inc(?PINGREQ) ->
emqtt_metrics:inc('packets/pingreq');
emqttd_metrics:inc('packets/pingreq');
inc(?DISCONNECT) ->
emqtt_metrics:inc('packets/disconnect');
emqttd_metrics:inc('packets/disconnect');
inc(_) ->
ignore.

View File

@ -24,7 +24,7 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_cm).
-module(emqttd_cm).
-author('feng@emqtt.io').
@ -32,7 +32,7 @@
-define(SERVER, ?MODULE).
-define(CLIENT_TAB, emqtt_client).
-define(CLIENT_TAB, mqtt_client).
%% API Exports
-export([start_link/0]).
@ -42,7 +42,6 @@
-export([getstats/0]).
%% gen_server Function Exports
-export([init/1,
handle_call/3,
handle_cast/2,
@ -74,7 +73,7 @@ start_link() ->
%%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(emqtt_client, ClientId) of
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> Pid;
[] -> undefined
end.
@ -175,10 +174,10 @@ insert(ClientId, Pid) ->
setstats(State = #state{max = Max}) ->
Count = ets:info(?CLIENT_TAB, size),
emqtt_broker:setstat('clients/count', Count),
emqttd_broker:setstat('clients/count', Count),
if
Count > Max ->
emqtt_broker:setstat('clients/max', Count),
emqttd_broker:setstat('clients/max', Count),
State#state{max = Count};
true ->
State

View File

@ -0,0 +1,78 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd config manager.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_config).
-define(SERVER, ?MODULE).
-behaviour(gen_server).
%% API Function Exports
-export([start_link/0, lookup/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%TODO: fix later...
lookup(Key) -> {ok, Key}.
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init(_Args) ->
%%TODO: Load application config.
ets:new(?MODULE, [set, protected, named_table]),
{ok, none}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================

View File

@ -0,0 +1,103 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd control commands.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_ctl).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-define(PRINT_MSG(Msg),
io:format(Msg)).
-define(PRINT(Format, Args),
io:format(Format, Args)).
-export([status/1,
cluster/1,
useradd/1,
userdel/1]).
%TODO: add comment
% bridge
% metrics
% broker
% sockets
status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
case lists:keysearch(emqttd, 1, application:which_applications()) of
false ->
?PRINT_MSG("emqttd is not running~n");
{value,_Version} ->
?PRINT_MSG("emqttd is running~n")
end.
cluster([]) ->
Nodes = [node()|nodes()],
?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster([SNode]) ->
Node = node_name(SNode),
case net_adm:ping(Node) of
pong ->
application:stop(emqttd),
application:stop(esockd),
mnesia:stop(),
mnesia:start(),
mnesia:change_config(extra_db_nodes, [Node]),
application:start(esockd),
application:start(emqttd),
?PRINT("cluster with ~p successfully.~n", [Node]);
pang ->
?PRINT("failed to connect to ~p~n", [Node])
end.
useradd([Username, Password]) ->
?PRINT("~p", [emqttd_auth:add(list_to_binary(Username), list_to_binary(Password))]).
userdel([Username]) ->
?PRINT("~p", [emqttd_auth:delete(list_to_binary(Username))]).
node_name(SNode) ->
SNode1 =
case string:tokens(SNode, "@") of
[_Node, _Server] ->
SNode;
_ ->
case net_kernel:longnames() of
true ->
SNode ++ "@" ++ inet_db:gethostname() ++
"." ++ inet_db:res_option(domain);
false ->
SNode ++ "@" ++ inet_db:gethostname();
_ ->
SNode
end
end,
list_to_atom(SNode1).

View File

@ -0,0 +1,33 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd event manager.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_event).
-export([start_link/0]).
start_link() ->
gen_event:start_link({local, ?MODULE}).

View File

@ -0,0 +1,97 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd http handler.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_http).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-include("emqttd_packet.hrl").
-import(proplists, [get_value/2, get_value/3]).
-export([handle/1]).
handle(Req) ->
case authorized(Req) of
true ->
Path = Req:get(path),
Method = Req:get(method),
handle(Method, Path, Req);
false ->
Req:respond({401, [], <<"Fobbiden">>})
end.
handle('POST', "/mqtt/publish", Req) ->
Params = mochiweb_request:parse_post(Req),
lager:info("HTTP Publish: ~p~n", [Params]),
Qos = int(get_value("qos", Params, "0")),
Retain = bool(get_value("retain", Params, "0")),
Topic = list_to_binary(get_value("topic", Params)),
Message = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} ->
emqttd_router:route(#mqtt_message{qos = Qos,
retain = Retain,
topic = Topic,
payload = Message}),
Req:ok({"text/plan", <<"ok\n">>});
{false, _} ->
Req:respond({400, [], <<"Bad QoS">>});
{_, false} ->
Req:respond({400, [], <<"Bad Topic">>})
end;
handle(_Method, _Path, Req) ->
Req:not_found().
%%------------------------------------------------------------------------------
%% basic authorization
%%------------------------------------------------------------------------------
authorized(Req) ->
case mochiweb_request:get_header_value("Authorization", Req) of
undefined ->
false;
"Basic " ++ BasicAuth ->
emqttd_auth:check(user_passwd(BasicAuth))
end.
user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
validate(topic, Topic) ->
emqttd_topic:validate({name, Topic}).
int(S) -> list_to_integer(S).
bool("0") -> false;
bool("1") -> true.

View File

@ -20,11 +20,11 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt keepalive.
%%% emqttd keepalive.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_keepalive).
-module(emqttd_keepalive).
-author('feng@emqtt.io').

View File

@ -0,0 +1,147 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd message.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_message).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-include("emqttd_packet.hrl").
-export([from_packet/1, to_packet/1]).
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
-export([dump/1]).
%%------------------------------------------------------------------------------
%% @doc
%% Message from Packet.
%%
%% @end
%%------------------------------------------------------------------------------
-spec from_packet(mqtt_packet()) -> mqtt_message().
from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = Qos,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}) ->
#mqtt_message{msgid = PacketId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload};
from_packet(#mqtt_packet_connect{will_flag = false}) ->
undefined;
from_packet(#mqtt_packet_connect{will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg}) ->
#mqtt_message{retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg}.
%%------------------------------------------------------------------------------
%% @doc
%% Message to packet
%%
%% @end
%%------------------------------------------------------------------------------
-spec( to_packet( mqtt_message() ) -> mqtt_packet() ).
to_packet(#mqtt_message{msgid = MsgId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload}) ->
PacketId = if
Qos =:= ?QOS_0 -> undefined;
true -> MsgId
end,
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}.
%%------------------------------------------------------------------------------
%% @doc
%% set dup, retain flag
%%
%% @end
%%------------------------------------------------------------------------------
-spec set_flag(mqtt_message()) -> mqtt_message().
set_flag(Msg) ->
Msg#mqtt_message{dup = true, retain = true}.
-spec set_flag(atom(), mqtt_message()) -> mqtt_message().
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
Msg#mqtt_message{dup = true};
set_flag(retain, Msg = #mqtt_message{retain = false}) ->
Msg#mqtt_message{retain = true};
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%%------------------------------------------------------------------------------
%% @doc
%% Unset dup, retain flag
%%
%% @end
%%------------------------------------------------------------------------------
-spec unset_flag(mqtt_message()) -> mqtt_message().
unset_flag(Msg) ->
Msg#mqtt_message{dup = false, retain = false}.
-spec unset_flag(dup | retain | atom(), mqtt_message()) -> mqtt_message().
unset_flag(dup, Msg = #mqtt_message{dup = true}) ->
Msg#mqtt_message{dup = false};
unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
Msg#mqtt_message{retain = false};
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%%------------------------------------------------------------------------------
%% @doc
%% Dump mqtt message.
%%
%% @end
%%------------------------------------------------------------------------------
dump(#mqtt_message{msgid= MsgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic}) ->
io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
[MsgId, Qos, Retain, Dup, Topic]).

View File

@ -20,21 +20,21 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt metrics module. responsible for collecting broker metrics.
%%% emqttd metrics. responsible for collecting broker metrics.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_metrics).
-module(emqttd_metrics).
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
-include("emqtt_systop.hrl").
-include("emqttd_systop.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(METRIC_TAB, ?MODULE).
-define(METRIC_TAB, mqtt_broker_metric).
%% API Function Exports
-export([start_link/1]).
@ -182,7 +182,7 @@ init(Options) ->
% Init metrics
[new_metric(Metric) || Metric <- Metrics],
% $SYS Topics for metrics
[{atomic, _} = emqtt_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
[{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
PubInterval = proplists:get_value(pub_interval, Options, 60),
{ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}.
@ -214,7 +214,7 @@ systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
publish(Topic, Payload) ->
emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}).
emqttd_router:route(#mqtt_message{topic = Topic, payload = Payload}).
new_metric({gauge, Name}) ->
ets:insert(?METRIC_TAB, {{Name, 0}, 0});

View File

@ -0,0 +1,48 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd mnesia.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mnesia).
-author('feng@emqtt.io').
-export([init/0, wait/0, stop/0]).
init() ->
case mnesia:system_info(extra_db_nodes) of
[] -> mnesia:create_schema([node()]);
_ -> ok
end,
ok = mnesia:start(),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
%TODO: timeout should be configured?
wait() ->
mnesia:wait_for_tables([topic, topic_trie, topic_trie_node, mqtt_user], 60000).
stop() ->
mnesia:stop().

View File

@ -0,0 +1,93 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd vm monitor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
%%TODO: this is a demo module....
-module(emqttd_monitor).
-author('feng@emqtt.io').
-behavior(gen_server).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {ok}).
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd monitor.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([]) ->
erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]),
{ok, #state{}}.
handle_call(Request, _From, State) ->
lager:error("unexpected request: ~p", [Request]),
{stop, {error, unexpected_request}, State}.
handle_cast(Msg, State) ->
lager:error("unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({monitor, GcPid, long_gc, Info}, State) ->
lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid,
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
{noreply, State};
handle_info({monitor, GcPid, large_heap, Info}, State) ->
lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid,
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
{noreply, State};
handle_info({monitor, SusPid, busy_port, Port}, State) ->
lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid,
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]),
{noreply, State};
handle_info(Info, State) ->
lager:error("unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1,25 +1,30 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_net).
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd net utility functions. some functions copied from rabbitmq.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_net).
-author('feng@emqtt.io').

View File

@ -1,4 +1,4 @@
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
@ -19,8 +19,9 @@
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_bridge).
%%% @doc
%%% clusted nodes monitor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_nodes_monitor).

View File

@ -20,11 +20,11 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt options handler.
%%% emqttd options handler.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_opts).
-module(emqttd_opts).
-export([merge/2]).

View File

@ -20,15 +20,15 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt packet.
%%% emqttd packet.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_packet).
-module(emqttd_packet).
-author("feng@emqtt.io").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
%% API
-export([protocol_name/1, type_name/1, connack_name/1]).
@ -108,7 +108,7 @@ dump_variable(#mqtt_packet_connect{
will_topic = WillTopic,
will_msg = WillMsg,
username = Username,
password = Password} ) ->
password = Password}) ->
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, dump_password(Password)],
{Format1, Args1} = if
@ -118,37 +118,30 @@ dump_variable(#mqtt_packet_connect{
end,
io_lib:format(Format1, Args1);
dump_variable(#mqtt_packet_connack{
ack_flags = AckFlags,
return_code = ReturnCode } ) ->
dump_variable(#mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode } ) ->
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]);
dump_variable(#mqtt_packet_publish{
topic_name = TopicName,
packet_id = PacketId} ) ->
dump_variable(#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId}) ->
io_lib:format("TopicName=~s, PacketId=~p", [TopicName, PacketId]);
dump_variable(#mqtt_packet_puback{
packet_id = PacketId } ) ->
dump_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
io_lib:format("PacketId=~p", [PacketId]);
dump_variable(#mqtt_packet_subscribe{
packet_id = PacketId,
topic_table = TopicTable }) ->
dump_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_table = TopicTable}) ->
io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, TopicTable]);
dump_variable(#mqtt_packet_unsubscribe{
packet_id = PacketId,
topics = Topics }) ->
dump_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
topics = Topics}) ->
io_lib:format("PacketId=~p, Topics=~p", [PacketId, Topics]);
dump_variable(#mqtt_packet_suback{
packet_id = PacketId,
qos_table = QosTable} ) ->
dump_variable(#mqtt_packet_suback{packet_id = PacketId,
qos_table = QosTable}) ->
io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]);
dump_variable(#mqtt_packet_unsuback{
packet_id = PacketId } ) ->
dump_variable(#mqtt_packet_unsuback{packet_id = PacketId}) ->
io_lib:format("PacketId=~p", [PacketId]);
dump_variable(PacketId) when is_integer(PacketId) ->

View File

@ -20,15 +20,15 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt received packet parser.
%%% emqttd received packet parser.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_parser).
-module(emqttd_parser).
-author("feng@emqtt.io").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
%% API
-export([init/0, parse/2]).

View File

@ -0,0 +1,27 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt plugin framework.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin).

View File

@ -24,11 +24,11 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_protocol).
-module(emqttd_protocol).
-include("emqtt.hrl").
-include("emqttd.hrl").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
%% API
-export([init/3, client_id/1]).
@ -62,7 +62,7 @@ init(Transport, Socket, Peername) ->
client_id(#proto_state{client_id = ClientId}) -> ClientId.
%%SHOULD be registered in emqtt_cm
%%SHOULD be registered in emqttd_cm
info(#proto_state{proto_vsn = ProtoVsn,
proto_name = ProtoName,
client_id = ClientId,
@ -90,7 +90,7 @@ received(_Packet, State = #proto_state{connected = false}) ->
received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName,
client_id = ClientId}) ->
lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]),
case validate_packet(Packet) of
ok ->
handle(Packet, State);
@ -106,16 +106,16 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}
keep_alive = KeepAlive,
client_id = ClientId} = Var,
lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]),
{ReturnCode1, State1} =
case validate_connect(Var) of
?CONNACK_ACCEPT ->
case emqtt_auth:check(Username, Password) of
case emqttd_auth:check(Username, Password) of
true ->
ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive),
emqtt_cm:register(ClientId1, self()),
emqttd_cm:register(ClientId1, self()),
{?CONNACK_ACCEPT, State#proto_state{will_msg = willmsg(Var),
clean_sess = CleanSess,
client_id = ClientId1}};
@ -128,27 +128,27 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}
end,
send(?CONNACK_PACKET(ReturnCode1), State1),
%%Starting session
{ok, Session} = emqtt_session:start({CleanSess, ClientId, self()}),
{ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}),
{ok, State1#proto_state{session = Session}};
handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
State = #proto_state{session = Session}) ->
emqtt_session:publish(Session, {?QOS_0, emqtt_message:from_packet(Packet)}),
emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)}),
{ok, State};
handle(Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, PacketId, _Payload),
State = #proto_state{session = Session}) ->
emqtt_session:publish(Session, {?QOS_1, emqtt_message:from_packet(Packet)}),
emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
handle(Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, PacketId, _Payload),
State = #proto_state{session = Session}) ->
NewSession = emqtt_session:publish(Session, {?QOS_2, emqtt_message:from_packet(Packet)}),
NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
when Type >= ?PUBACK andalso Type =< ?PUBCOMP ->
NewSession = emqtt_session:puback(Session, {Type, PacketId}),
NewSession = emqttd_session:puback(Session, {Type, PacketId}),
NewState = State#proto_state{session = NewSession},
if
Type =:= ?PUBREC ->
@ -161,11 +161,11 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
{ok, NewState};
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
{ok, NewSession, GrantedQos} = emqtt_session:subscribe(Session, TopicTable),
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession});
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
{ok, NewSession} = emqtt_session:unsubscribe(Session, Topics),
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics),
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
handle(?PACKET(?PINGREQ), State) ->
@ -179,24 +179,24 @@ handle(?PACKET(?DISCONNECT), State) ->
-spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
%% qos0 message
send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
send(emqtt_message:to_packet(Message), State);
send(emqttd_message:to_packet(Message), State);
%% message from session
send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
send(emqtt_message:to_packet(Message), State);
send(emqttd_message:to_packet(Message), State);
%% message(qos1, qos2) not from session
send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
{Message1, NewSession} = emqtt_session:store(Session, Message),
send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
{Message1, NewSession} = emqttd_session:store(Session, Message),
send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession});
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]),
sent_stats(Packet),
Data = emqtt_serialiser:serialise(Packet),
Data = emqttd_serialiser:serialise(Packet),
lager:debug("SENT to ~s: ~p", [PeerName, Data]),
emqtt_metrics:inc('bytes/sent', size(Data)),
emqttd_metrics:inc('bytes/sent', size(Data)),
Transport:send(Sock, Data),
{ok, State}.
@ -213,7 +213,7 @@ shutdown(Error, #proto_state{peer_name = PeerName, client_id = ClientId, will_ms
ok.
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqtt_message:from_packet(Packet).
emqttd_message:from_packet(Packet).
clientid(<<>>, #proto_state{peer_name = PeerName}) ->
<<"eMQTT/", (base64:encode(PeerName))/binary>>;
@ -224,7 +224,7 @@ clientid(ClientId, _State) -> ClientId.
send_willmsg(undefined) -> ignore;
%%TODO:should call session...
send_willmsg(WillMsg) -> emqtt_router:route(WillMsg).
send_willmsg(WillMsg) -> emqttd_router:route(WillMsg).
start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 ->
@ -264,7 +264,7 @@ validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess,
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
case emqtt_topic:validate({name, Topic}) of
case emqttd_topic:validate({name, Topic}) of
true -> ok;
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
end;
@ -288,7 +288,7 @@ validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
ErrTopics = [Topic || {Topic, Qos} <- Topics,
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
case ErrTopics of
[] -> ok;
_ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
@ -299,22 +299,22 @@ validate_qos(Qos) when Qos =< ?QOS_2 -> true;
validate_qos(_) -> false.
try_unregister(undefined, _) -> ok;
try_unregister(ClientId, _) -> emqtt_cm:unregister(ClientId, self()).
try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()).
sent_stats(?PACKET(Type)) ->
emqtt_metrics:inc('packets/sent'),
emqttd_metrics:inc('packets/sent'),
inc(Type).
inc(?CONNACK) ->
emqtt_metrics:inc('packets/connack');
emqttd_metrics:inc('packets/connack');
inc(?PUBLISH) ->
emqtt_metrics:inc('messages/sent'),
emqtt_metrics:inc('packets/publish/sent');
emqttd_metrics:inc('messages/sent'),
emqttd_metrics:inc('packets/publish/sent');
inc(?SUBACK) ->
emqtt_metrics:inc('packets/suback');
emqttd_metrics:inc('packets/suback');
inc(?UNSUBACK) ->
emqtt_metrics:inc('packets/unsuback');
emqttd_metrics:inc('packets/unsuback');
inc(?PINGRESP) ->
emqtt_metrics:inc('packets/pingresp');
emqttd_metrics:inc('packets/pingresp');
inc(_) ->
ingore.

View File

@ -24,7 +24,7 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_pubsub).
-module(emqttd_pubsub).
-author('feng@emqtt.io').
@ -32,11 +32,11 @@
-define(SERVER, ?MODULE).
-include("emqtt.hrl").
-include("emqttd.hrl").
-include("emqtt_topic.hrl").
-include("emqttd_topic.hrl").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
-include_lib("stdlib/include/qlc.hrl").
@ -184,7 +184,7 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
%%------------------------------------------------------------------------------
-spec match(Topic :: binary()) -> [topic()].
match(Topic) when is_binary(Topic) ->
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqttd_topic:words(Topic)]),
Names = [Name || #topic_trie_node{topic=Name} <- TrieNodes, Name=/= undefined],
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]).
@ -306,7 +306,7 @@ try_remove_subscriber({Topic, Qos}, SubPid) ->
try_remove_topic(Name) when is_binary(Name) ->
case ets:member(topic_subscriber, Name) of
false ->
Topic = emqtt_topic:new(Name),
Topic = emqttd_topic:new(Name),
Fun = fun() ->
mnesia:delete_object(Topic),
case mnesia:read(topic, Name) of
@ -320,7 +320,7 @@ try_remove_topic(Name) when is_binary(Name) ->
end.
trie_add(Topic) when is_binary(Topic) ->
mnesia:write(emqtt_topic:new(Topic)),
mnesia:write(emqttd_topic:new(Topic)),
case mnesia:read(topic_trie_node, Topic) of
[TrieNode=#topic_trie_node{topic=undefined}] ->
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
@ -328,7 +328,7 @@ trie_add(Topic) when is_binary(Topic) ->
{atomic, already_exist};
[] ->
%add trie path
[trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],
[trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
%add last node
mnesia:write(#topic_trie_node{node_id=Topic, topic=Topic})
end.
@ -337,7 +337,7 @@ trie_delete(Topic) when is_binary(Topic) ->
case mnesia:read(topic_trie_node, Topic) of
[#topic_trie_node{edge_count=0}] ->
mnesia:delete({topic_trie_node, Topic}),
trie_delete_path(lists:reverse(emqtt_topic:triples(Topic)));
trie_delete_path(lists:reverse(emqttd_topic:triples(Topic)));
[TrieNode] ->
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
[] ->
@ -400,19 +400,19 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
end.
setstats(State = #state{max_subs = Max}) ->
emqtt_broker:setstat('topics/count', mnesia:table_info(topic, size)),
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)),
SubCount = ets:info(topic_subscriber, size),
emqtt_broker:setstat('subscribers/count', SubCount),
emqttd_broker:setstat('subscribers/count', SubCount),
if
SubCount > Max ->
emqtt_broker:setstat('subscribers/max', SubCount),
emqttd_broker:setstat('subscribers/max', SubCount),
State#state{max_subs = SubCount};
true ->
State
end.
dropped(true) ->
emqtt_metrics:inc('messages/dropped');
emqttd_metrics:inc('messages/dropped');
dropped(false) ->
ok.

View File

@ -0,0 +1,96 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd simple queue.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
%% TODO: this module should be removed...
-module(emqttd_queue).
-include("emqttd_packet.hrl").
-export([new/1, new/2, in/3, all/1, clear/1]).
-define(DEFAULT_MAX_LEN, 1000).
-record(mqtt_queue_wrapper, {queue = queue:new(),
max_len = ?DEFAULT_MAX_LEN,
store_qos0 = false}).
-type mqtt_queue() :: #mqtt_queue_wrapper{}.
%%------------------------------------------------------------------------------
%% @doc
%% New Queue.
%%
%% @end
%%------------------------------------------------------------------------------
-spec new(non_neg_integer()) -> mqtt_queue().
new(MaxLen) -> #mqtt_queue_wrapper{max_len = MaxLen}.
new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = StoreQos0}.
%%------------------------------------------------------------------------------
%% @doc
%% Queue one message.
%%
%% @end
%%------------------------------------------------------------------------------
-spec in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue().
in(ClientId, Message = #mqtt_message{qos = Qos},
Wrapper = #mqtt_queue_wrapper{queue = Queue, max_len = MaxLen}) ->
case queue:len(Queue) < MaxLen of
true ->
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)};
false -> % full
if
Qos =:= ?QOS_0 ->
lager:warning("Queue ~s drop qos0 message: ~p", [ClientId, Message]),
Wrapper;
true ->
{{value, Msg}, Queue1} = queue:drop(Queue),
lager:warning("Queue ~s drop message: ~p", [ClientId, Msg]),
Wrapper#mqtt_queue_wrapper{queue = Queue1}
end
end.
%%------------------------------------------------------------------------------
%% @doc
%% Get all messages in queue.
%%
%% @end
%%------------------------------------------------------------------------------
-spec all(mqtt_queue()) -> list().
all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue).
%%------------------------------------------------------------------------------
%% @doc
%% Clear queue.
%%
%% @end
%%------------------------------------------------------------------------------
-spec clear(mqtt_queue()) -> mqtt_queue().
clear(Queue) -> Queue#mqtt_queue_wrapper{queue = queue:new()}.

View File

@ -21,62 +21,60 @@
%%------------------------------------------------------------------------------
%%route chain... statistics
-module(emqtt_router).
-module(emqttd_router).
-include("emqtt.hrl").
-include("emqttd_packet.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
%%Router Chain-->
%%--->In
%%Out<---
%%Router Chain--> --->In Out<---
-export([route/1]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%----------------------------------------------------------------------------
-record(state, {}).
-ifdef(use_specs).
-spec(start_link/1 :: () -> {ok, pid()}).
-spec route(mqtt_message()) -> ok.
-endif.
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd router.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% @doc
%% Route mqtt message.
%%
%% @end
%%------------------------------------------------------------------------------
-spec route(mqtt_message()) -> ok.
route(Msg) ->
lager:info("Route ~s", [emqtt_message:dump(Msg)]),
% need to retain?
emqtt_server:retain(Msg),
lager:info("Route ~s", [emqttd_message:dump(Msg)]),
% TODO: need to retain?
emqttd_server:retain(Msg),
% unset flag and pubsub
emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ).
emqttd_pubsub:publish(emqttd_message:unset_flag(Msg)).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init(Args) ->
{ok, Args, hibernate}.
init([]) ->
{ok, #state{}, hibernate}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
@ -93,7 +91,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% Internal functions
%%%=============================================================================

View File

@ -24,11 +24,11 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_serialiser).
-module(emqttd_serialiser).
-author("feng@emqtt.io").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
%% API
-export([serialise/1]).

View File

@ -24,7 +24,7 @@
%%% TODO: redesign...
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_server).
-module(emqttd_server).
-author('feng@slimpp.io').
@ -32,17 +32,15 @@
-define(SERVER, ?MODULE).
-include("emqtt.hrl").
-include("emqttd.hrl").
-include("emqtt_topic.hrl").
-include("emqttd_topic.hrl").
-include("emqtt_packet.hrl").
-record(emqtt_retained, {topic, qos, payload}).
-include("emqttd_packet.hrl").
-record(state, {store_limit}).
-define(RETAINED_TAB, emqtt_retained).
-define(RETAINED_TAB, mqtt_retained).
-define(STORE_LIMIT, 1000000).
@ -85,7 +83,7 @@ init([Opts]) ->
mnesia:create_table(?RETAINED_TAB, [
{type, ordered_set},
{ram_copies, [node()]},
{attributes, record_info(fields, emqtt_retained)}]),
{attributes, record_info(fields, mqtt_retained)}]),
mnesia:add_table_copy(?RETAINED_TAB, node(), ram_copies),
Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT),
{ok, #state{store_limit = Limit}}.
@ -102,10 +100,10 @@ handle_cast({retain, Msg = #mqtt_message{topic = Topic,
lager:error("Dropped message(retain) for table is full: ~p", [Msg]);
_ ->
lager:debug("Retained message: ~p", [Msg]),
mnesia:dirty_write(#emqtt_retained{topic = Topic,
mnesia:dirty_write(#mqtt_retained{topic = Topic,
qos = Qos,
payload = Payload}),
emqtt_metrics:set('messages/retained/count',
emqttd_metrics:set('messages/retained/count',
mnesia:table_info(?RETAINED_TAB, size))
end,
{noreply, State};
@ -131,14 +129,14 @@ match(Topics) ->
lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]).
match(Topic, RetainedTopics) ->
case emqtt_topic:type(#topic{name=Topic}) of
case emqttd_topic:type(#topic{name=Topic}) of
direct -> %% FIXME
[Topic];
wildcard ->
[T || T <- RetainedTopics, emqtt_topic:match(T, Topic)]
[T || T <- RetainedTopics, emqttd_topic:match(T, Topic)]
end.
retained_msg(#emqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.

View File

@ -24,11 +24,11 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_session).
-module(emqttd_session).
-include("emqtt.hrl").
-include("emqttd.hrl").
-include("emqtt_packet.hrl").
-include("emqttd_packet.hrl").
%% API Function Exports
-export([start/1,
@ -75,11 +75,11 @@
-spec start({boolean(), binary(), pid()}) -> {ok, session()}.
start({true = _CleanSess, ClientId, _ClientPid}) ->
%%Destroy old session if CleanSess is true before.
ok = emqtt_sm:destroy_session(ClientId),
ok = emqttd_sm:destroy_session(ClientId),
{ok, initial_state(ClientId)};
start({false = _CleanSess, ClientId, ClientPid}) ->
{ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid),
{ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid),
{ok, SessPid}.
%%------------------------------------------------------------------------------
@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
%%------------------------------------------------------------------------------
-spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session().
publish(Session, {?QOS_0, Message}) ->
emqtt_router:route(Message), Session;
emqttd_router:route(Message), Session;
publish(Session, {?QOS_1, Message}) ->
emqtt_router:route(Message), Session;
emqttd_router:route(Message), Session;
publish(SessState = #session_state{awaiting_rel = AwaitingRel},
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
puback(SessState = #session_state{client_id = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqtt_router:route(Msg);
{ok, Msg} -> emqttd_router:route(Msg);
error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId])
end,
SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)};
@ -185,9 +185,9 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top
_ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs])
end,
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
{ok, GrantedQos} = emqtt_pubsub:subscribe(Topics, self()),
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics, self()),
%%TODO: should be gen_event and notification...
emqtt_server:subscribe([ Name || {Name, _} <- Topics ], self()),
emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()),
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->
@ -208,7 +208,7 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T
BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs])
end,
%%unsubscribe from topic tree
ok = emqtt_pubsub:unsubscribe(Topics, self()),
ok = emqttd_pubsub:unsubscribe(Topics, self()),
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
{ok, SessState#session_state{submap = SubMap1}};
@ -263,7 +263,7 @@ init([SessOpts, ClientId, ClientPid]) ->
true = link(ClientPid),
State = initial_state(ClientId, ClientPid),
Expires = proplists:get_value(expires, SessOpts, 1) * 3600,
MsgQueue = emqtt_queue:new(proplists:get_value(max_queue, SessOpts, 1000),
MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000),
proplists:get_value(store_qos0, SessOpts, false)),
{ok, State#session_state{expires = Expires,
msg_queue = MsgQueue}, hibernate}.
@ -304,10 +304,10 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{
%% send offline messages
lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Msg}}
end, emqtt_queue:all(Queue)),
end, emqttd_queue:all(Queue)),
{noreply, State#session_state{client_pid = ClientPid,
msg_queue = emqtt_queue:clear(Queue),
msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate};
handle_cast({publish, ?QOS_2, Message}, State) ->
@ -379,7 +379,7 @@ dispatch(Message = #mqtt_message{qos = Qos}, State = #session_state{client_pid =
NewState.
queue(ClientId, Message, State = #session_state{msg_queue = Queue}) ->
State#session_state{msg_queue = emqtt_queue:in(ClientId, Message, Queue)}.
State#session_state{msg_queue = emqttd_queue:in(ClientId, Message, Queue)}.
next_msg_id(State = #session_state{message_id = 16#ffff}) ->
State#session_state{message_id = 1};

View File

@ -0,0 +1,54 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd session supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_session_sup).
-author('feng@emqtt.io').
-behavior(supervisor).
-export([start_link/1, start_session/2]).
-export([init/1]).
%TODO: FIX COMMENTS...
-spec start_link([tuple()]) -> {ok, pid()}.
start_link(SessOpts) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [SessOpts]).
-spec start_session(binary(), pid()) -> {ok, pid()}.
start_session(ClientId, ClientPid) ->
supervisor:start_child(?MODULE, [ClientId, ClientPid]).
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([SessOpts]) ->
{ok, {{simple_one_for_one, 0, 1},
[{session, {emqttd_session, start_link, [SessOpts]},
transient, 10000, worker, [emqttd_session]}]}}.

View File

@ -20,7 +20,7 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt session manager.
%%% emqttd session manager.
%%%
%%% The Session state in the Server consists of:
%%% The existence of a Session, even if the rest of the Session state is empty.
@ -34,17 +34,17 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_sm).
-module(emqttd_sm).
%%cleanSess: true | false
-include("emqtt.hrl").
-include("emqttd.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(SESSION_TAB, emqtt_session).
-define(SESSION_TAB, mqtt_session).
%% API Function Exports
-export([start_link/0]).
@ -110,10 +110,10 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply =
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _MRef}] ->
emqtt_session:resume(SessPid, ClientId, ClientPid),
emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid};
[] ->
case emqtt_session_sup:start_session(ClientId, ClientPid) of
case emqttd_session_sup:start_session(ClientId, ClientPid) of
{ok, SessPid} ->
MRef = erlang:monitor(process, SessPid),
ets:insert(?SESSION_TAB, {ClientId, SessPid, MRef}),
@ -128,7 +128,7 @@ handle_call({destroy_session, ClientId}, _From, State) ->
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] ->
erlang:demonitor(MRef),
emqtt_session:destroy(SessPid, ClientId),
emqttd_session:destroy(SessPid, ClientId),
ets:delete(?SESSION_TAB, ClientId);
[] ->
ignore
@ -160,10 +160,10 @@ code_change(_OldVsn, State, _Extra) ->
setstats(State = #state{max = Max}) ->
Count = ets:info(?SESSION_TAB, size),
emqtt_broker:setstat('sessions/count', Count),
emqttd_broker:setstat('sessions/count', Count),
if
Count > Max ->
emqtt_broker:setstat('sessions/max', Count),
emqttd_broker:setstat('sessions/max', Count),
State#state{max = Count};
true ->
State

View File

@ -0,0 +1,67 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_sup).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0, start_child/1, start_child/2]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([]) ->
{ok, {{one_for_all, 10, 100}, []}}.

View File

@ -0,0 +1,30 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd throttle.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_throttle).
%% TODO:... 0.6.0...

View File

@ -20,7 +20,7 @@
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_topic).
-module(emqttd_topic).
-author('feng@emqtt.io').
@ -46,7 +46,7 @@
%% There can be any number of root nodes; that is, there can be any number of topic trees.
%% ------------------------------------------------------------------------
-include("emqtt_topic.hrl").
-include("emqttd_topic.hrl").
-export([new/1, type/1, match/2, validate/1, triples/1, words/1]).

View File

@ -1,4 +1,4 @@
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
@ -19,11 +19,9 @@
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_event).
-export([start_link/0]).
start_link() ->
gen_event:start_link({local, ?MODULE}).
%%% @doc
%%% emqttd erlang vm.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_vm).

48
apps/emqttd/src/x.erl Normal file
View File

@ -0,0 +1,48 @@
-module(x).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Args) ->
{ok, Args}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

View File

@ -19,9 +19,7 @@
{sub_dirs, [
"rel",
"apps/emqtt"]}.
{lib_dirs, ["apps/emqtt"]}.
"apps/emqttd"]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},

View File

@ -5,7 +5,7 @@
{start_pg2, true}
]},
{sasl, [
{sasl_error_logger, {file, "log/emqtt_sasl.log"}}
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
]},
{mnesia, [
{dir, "data"}
@ -15,18 +15,18 @@
]},
{lager, [
{error_logger_redirect, false},
{crash_log, "log/emqtt_crash.log"},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
{lager_console_backend, debug},
{lager_file_backend, [
{file, "log/emqtt_error.log"},
{file, "log/emqttd_error.log"},
{level, error},
{size, 10485760},
{date, "$D0"},
{count, 5}
]},
{lager_file_backend, [
{file, "log/emqtt_info.log"},
{file, "log/emqttd_info.log"},
{level, info},
{size, 10485760},
{date, "$D0"},
@ -37,7 +37,7 @@
{esockd, [
{logger, {lager, info}}
]},
{emqtt, [
{emqttd, [
%Authetication. Internal, Anonymous Default.
{auth, {anonymous, []}},
{access, []},

View File

@ -146,11 +146,11 @@ case "$1" in
"exec $RUNNER_SCRIPT_DIR/$SCRIPT console" 2>&1
# Wait for the node to come up. We can't just ping it because
# distributed erlang comes up for a second before emqtt crashes
# distributed erlang comes up for a second before emqttd crashes
# (eg. in the case of an unwriteable disk). Once the node comes
# up we check for the node watcher process. If that's running
# then we assume things are good enough. This will at least let
# the user know when emqtt is crashing right after startup.
# the user know when emqttd is crashing right after startup.
WAIT=${WAIT_FOR_ERLANG:-15}
while [ $WAIT -gt 0 ]; do
WAIT=`expr $WAIT - 1`
@ -159,11 +159,11 @@ case "$1" in
if [ "$?" -ne 0 ]; then
continue
fi
echo "emqtt is started successfully!"
echo "emqttd is started successfully!"
exit 0
done
echo "emqtt failed to start within ${WAIT_FOR_ERLANG:-15} seconds,"
echo "see the output of 'emqtt console' for more information."
echo "emqttd failed to start within ${WAIT_FOR_ERLANG:-15} seconds,"
echo "see the output of 'emqttd console' for more information."
echo "If you want to wait longer, set the environment variable"
echo "WAIT_FOR_ERLANG to the number of seconds to wait."
exit 1

View File

@ -1,6 +1,6 @@
@setlocal
@set node_name=emqtt
@set node_name=emqttd
@rem Get the absolute path to the parent directory,
@rem which is assumed to be the node root.

View File

@ -94,7 +94,7 @@ case "$1" in
fi
shift
$NODETOOL rpc emqtt_ctl status $@
$NODETOOL rpc emqttd_ctl status $@
;;
cluster)
@ -106,12 +106,12 @@ case "$1" in
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqtt is not running!"
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl cluster $@
$NODETOOL rpc emqttd_ctl cluster $@
;;
useradd)
@ -123,12 +123,12 @@ case "$1" in
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqtt is not running!"
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl useradd $@
$NODETOOL rpc emqttd_ctl useradd $@
;;
userdel)
@ -140,17 +140,17 @@ case "$1" in
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqtt is not running!"
echo "emqttd is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl userdel $@
$NODETOOL rpc emqttd_ctl userdel $@
;;
*)
echo "Usage: $SCRIPT"
echo " status #query emqtt status"
echo " status #query emqttd status"
echo " cluster [<Node>] #query or cluster nodes"
echo " useradd <Username> <Password> #add user"
echo " userdel <Username> #delete user"

View File

@ -1,8 +1,8 @@
## Name of the node
-name emqtt@127.0.0.1
-name emqttd@127.0.0.1
## Cookie for distributed erlang
-setcookie emqttsecretcookie
-setcookie emqttdsecretcookie
## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
## (Disabled by default..use with caution!)

View File

@ -2,7 +2,7 @@
{lib_dirs, ["../apps", "../deps", "../plugins"]},
{erts, [{mod_cond, derived}, {app_file, strip}]},
{app_file, strip},
{rel, "emqtt", "0.2.0",
{rel, "emqttd", "0.5.0",
[
kernel,
stdlib,
@ -17,14 +17,14 @@
lager,
esockd,
mochiweb,
emqtt
emqttd
]},
{rel, "start_clean", "",
[
kernel,
stdlib
]},
{boot_rel, "emqtt"},
{boot_rel, "emqttd"},
{profile, embedded},
{incl_cond, derived},
%{mod_cond, derived},
@ -47,10 +47,10 @@
{app, lager, [{mod_cond, app}, {incl_cond, include}]},
{app, esockd, [{mod_cond, app}, {incl_cond, include}]},
{app, mochiweb, [{mod_cond, app}, {incl_cond, include}]},
{app, emqtt, [{mod_cond, app}, {incl_cond, include}]}
{app, emqttd, [{mod_cond, app}, {incl_cond, include}]}
]}.
{target_dir, "emqtt"}.
{target_dir, "emqttd"}.
{overlay_vars, "vars.config"}.
@ -60,9 +60,9 @@
{mkdir, "data/"},
{copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"},
{template, "files/nodetool", "\{\{erts_vsn\}\}/bin/nodetool"},
{template, "files/emqtt", "bin/emqtt"},
{template, "files/emqtt_ctl", "bin/emqtt_ctl"},
{template, "files/emqtt.cmd", "bin/emqtt.cmd"},
{template, "files/emqttd", "bin/emqttd"},
{template, "files/emqttd_ctl", "bin/emqttd_ctl"},
{template, "files/emqttd.cmd", "bin/emqttd.cmd"},
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
{copy, "files/ssl/ssl.crt", "etc/ssl.crt"},

View File

@ -18,7 +18,7 @@
%%
%%
%% bin/emqtt
%% bin/emqttd
%%
{runner_script_dir, "$(cd ${0%/*} && pwd)"}.
{runner_base_dir, "${RUNNER_SCRIPT_DIR%/*}"}.