rewrite all modules

This commit is contained in:
Feng Lee 2014-12-08 17:40:45 +08:00
parent 0dbb739416
commit 23163edab7
17 changed files with 122 additions and 168 deletions

View File

@ -31,7 +31,9 @@
-define(ERTS_MINIMUM, "6.0").
%% qos levels
%%------------------------------------------------------------------------------
%% MQTT Qos
%%------------------------------------------------------------------------------
-define(QOS_0, 0).
-define(QOS_1, 1).
@ -39,6 +41,9 @@
-type qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0.
%%------------------------------------------------------------------------------
%% MQTT Message
%%------------------------------------------------------------------------------
-record(mqtt_msg, {
retain,
qos,
@ -51,4 +56,13 @@
-type mqtt_msg() :: #mqtt_msg{}.
%%------------------------------------------------------------------------------
%% MQTT User Management
%%------------------------------------------------------------------------------
-record(emqtt_user, {
username :: binary(),
passwdhash :: binary()
}).

View File

@ -2,7 +2,6 @@
% NOTICE: copy from rabbitmq mqtt-adaper
%
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
@ -19,6 +18,8 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-define(PROTOCOL_NAMES, [{3, "MQIsdp"}, {4, "MQTT"}]).
-define(MQTT_PROTO_MAJOR, 3).
-define(MQTT_PROTO_MINOR, 1).

View File

@ -21,11 +21,8 @@
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% pubsub topic
%% Core PubSub Topic
%%------------------------------------------------------------------------------
%name: <<"a/b/c">>
%node: node()
%words: [<<"a">>, <<"b">>, <<"c">>]
-record(topic, {
name :: binary(),
node :: node()
@ -55,12 +52,3 @@
node_id :: binary()
}).
%%------------------------------------------------------------------------------
%% internal user
%%------------------------------------------------------------------------------
-record(internal_user, {
username :: binary(),
passwdhash :: binary()
}).

View File

@ -3,7 +3,7 @@
{description, "Erlang MQTT Broker"},
{vsn, git},
{modules, []},
{registered, [ ]},
{registered, []},
{applications, [kernel,
stdlib]},
{mod, {emqtt_app, []}},

View File

@ -40,7 +40,6 @@ listen({mqtt, Port, Options}) ->
esockd:listen(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs);
listen({http, Port, Options}) ->
Auth = proplists:get_value(auth, Options),
MFArgs = {emqtt_http, handle, [Auth]},
mochiweb:start_http(Port, proplists:delete(auth, Options), MFArgs).
MFArgs = {emqtt_http, handle, []},
mochiweb:start_http(Port, Options, MFArgs).

View File

@ -40,12 +40,13 @@
%%
start(_StartType, _StartArgs) ->
print_banner(),
{ok, SupPid} = emqtt_sup:start_link(),
{ok, Sup} = emqtt_sup:start_link(),
start_servers(Sup),
{ok, Listeners} = application:get_env(listen),
emqtt:listen(Listeners),
register(emqtt, self()),
print_vsn(),
{ok, SupPid}.
{ok, Sup}.
print_banner() ->
?PRINT("starting emqtt on node '~s'~n", [node()]).
@ -55,6 +56,40 @@ print_vsn() ->
{ok, Desc} = application:get_key(description),
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_servers(Sup) ->
lists:foreach(
fun({Name, F}) when is_function(F) ->
?PRINT("~s is starting...", [Name]),
F(),
?PRINT_MSG("[done]~n");
({Name, Server}) when is_atom(Server) ->
?PRINT("~s is starting...", [Name]),
start_child(Sup, Server),
?PRINT_MSG("[done]~n");
({Name, Server, Opts}) when is_atom(Server) ->
?PRINT("~s is starting...", [ Name]),
start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n")
end,
[{"emqtt cm", emqtt_cm},
{"emqtt auth", emqtt_auth},
{"emqtt retained", emqtt_retained},
{"emqtt pubsub", emqtt_pubsub},
{"emqtt monitor", emqtt_monitor}
]).
start_child(Sup, Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)).
start_child(Sup, Name, Opts) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)).
worker_spec(Name) ->
{Name, {Name, start_link, []},
permanent, 5000, worker, [Name]}.
worker_spec(Name, Opts) ->
{Name, {Name, start_link, [Opts]},
permanent, 5000, worker, [Name]}.
%%
%% @spec stop(atom) -> 'ok'
%%

View File

@ -22,13 +22,12 @@
-module(emqtt_auth).
-author('ery.lee@gmail.com').
-author('feng.lee@slimchat.io').
-include("emqtt.hrl").
-include("emqtt_log.hrl").
-export([start_link/0,
add/2,
check/2,
@ -43,44 +42,40 @@
terminate/2,
code_change/3]).
-record(state, {authmod, authopts}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec check(Usename :: binary(), Password :: binary()) -> true | false.
check(Username, Password) ->
gen_server:call(?MODULE, {check, Username, Password}).
execute(check, [Username, Password]).
add(Username, Password) when is_binary(Username) ->
gen_server:call(?MODULE, {add, Username, Password}).
-spec add(Usename :: binary(), Password :: binary()) -> ok.
add(Username, Password) ->
execute(add, [Username, Password]).
delete(Username) when is_binary(Username) ->
gen_server:cast(?MODULE, {delete, Username}).
-spec delete(Username :: binary()) -> ok.
delete(Username) ->
execute(delete, [Username]).
execute(F, Args) ->
[{_, M}] = ets:lookup(emqtt_auth, mod),
apply(M, F, Args).
init([]) ->
{ok, {Name, Opts}} = application:get_env(auth),
AuthMod = authmod(Name),
ok = AuthMod:init(Opts),
?INFO("authmod is ~p", [AuthMod]),
?INFO("~p is started", [?MODULE]),
{ok, #state{authmod=AuthMod, authopts=Opts}}.
ets:new(emqtt_auth, [named_table, protected]),
ets:insert(emqtt_quth, {mod, AuthMod}),
?PRINT("emqtt authmod is ~p", [AuthMod]),
{ok, undefined}.
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqtt_auth_", Name])).
handle_call({check, Username, Password}, _From, #state{authmod=AuthMod} = State) ->
{reply, AuthMod:check(Username, Password), State};
handle_call({add, Username, Password}, _From, #state{authmod=AuthMod} = State) ->
{reply, AuthMod:add(Username, Password), State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({delete, Username}, #state{authmod=AuthMod} = State) ->
AuthMod:delete(Username),
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
@ -92,3 +87,4 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -22,7 +22,7 @@
-module(emqtt_auth_anonymous).
-author('ery.lee@gmail.com').
-author('feng.lee@slimchat.io').
-export([init/1,
add/2,
@ -36,3 +36,4 @@ check(_, _) -> true.
add(_, _) -> ok.
delete(_Username) -> ok.

View File

@ -22,7 +22,7 @@
-module(emqtt_auth_internal).
-include("emqtt_internal.hrl").
-include("emqtt.hrl").
-export([init/1,
add/2,
@ -30,10 +30,10 @@
delete/1]).
init(_Opts) ->
mnesia:create_table(internal_user, [
mnesia:create_table(emqtt_user, [
{ram_copies, [node()]},
{attributes, record_info(fields, internal_user)}]),
mnesia:add_table_copy(internal_user, node(), ram_copies),
{attributes, record_info(fields, emqtt_user)}]),
mnesia:add_table_copy(emqtt_user, node(), ram_copies),
ok.
check(undefined, _) -> false;
@ -42,14 +42,14 @@ check(_, undefined) -> false;
check(Username, Password) when is_binary(Username) ->
PasswdHash = crypto:hash(md5, Password),
case mnesia:dirty_read(internal_user, Username) of
[#internal_user{passwdhash=PasswdHash}] -> true;
case mnesia:dirty_read(emqtt_user, Username) of
[#emqtt_user{passwdhash=PasswdHash}] -> true;
_ -> false
end.
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:hash(md5, Password)}).
mnesia:dirty_write(#emqtt_user{username=Username, passwdhash=crypto:hash(md5, Password)}).
delete(Username) when is_binary(Username) ->
mnesia:dirty_delete(internal_user, Username).
mnesia:dirty_delete(emqtt_user, Username).

View File

@ -24,7 +24,7 @@
-behaviour(gen_server).
-export([start_link/1, info/1, go/2]).
-export([start_link/1, info/1, go/2, stop/2]).
-export([init/1,
handle_call/3,
@ -39,8 +39,6 @@
-include("emqtt_frame.hrl").
-include("emqtt_internal.hrl").
-define(CLIENT_ID_MAXLEN, 23).
-record(state, {socket,
@ -71,6 +69,9 @@ info(Pid) ->
go(Pid, Sock) ->
gen_server:call(Pid, {go, Sock}).
stop(Pid, Error) ->
gen_server:cast(Pid, {stop, Error}).
init([Sock]) ->
{ok, #state{socket = Sock}}.
@ -89,9 +90,6 @@ handle_call({go, Sock}, _From, State=#state{socket = Sock}) ->
awaiting_ack = gb_trees:empty(),
awaiting_rel = gb_trees:empty()})};
handle_call(duplicate_id, _From, State=#state{conn_name=ConnName, client_id=ClientId}) ->
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
stop({shutdown, duplicate_id}, State);
handle_call(info, _From, #state{conn_name=ConnName,
message_id=MsgId, client_id=ClientId} = State) ->
@ -103,6 +101,10 @@ handle_call(info, _From, #state{conn_name=ConnName,
handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) ->
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
stop({shutdown, duplicate_id}, State);
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.

View File

@ -56,15 +56,25 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) ->
case ets:lookup(emqtt_client, ClientId) of
[{_, Pid}] -> Pid;
[] -> undefined
end.
-spec create(ClientId :: binary(), Pid :: pid()) -> ok.
create(ClientId, Pid) ->
case lookup(ClientId) of
OldPid when is_pid(OldPid) ->
%%TODO: FIX STOP...
emqtt_client:stop(OldPid, duplicate_id);
undefined ->
ok
end,
ets:insert(emqtt_client, {ClientId, Pid}).
-spec destroy(binary() | pid()) -> ok.
destroy(ClientId) when is_binary(ClientId) ->
ets:delete(emqtt_client, ClientId);
@ -95,3 +105,4 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -27,13 +27,10 @@
-include("emqtt_frame.hrl").
-include("emqtt_internal.hrl").
-export([parse/2, initial_state/0]).
-export([serialise/1]).
-define(RESERVED, 0).
-define(PROTOCOL_MAGIC, "MQIsdp").
-define(MAX_LEN, 16#fffffff).
-define(HIGHBIT, 2#10000000).
-define(LOWBITS, 2#01111111).
@ -68,7 +65,7 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type,
qos = Qos } = Fixed, Length) ->
case {Type, Bin} of
{?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
{ProtocolMagic, Rest1} = parse_utf(FrameBin),
{ProtoName, Rest1} = parse_utf(FrameBin),
<<ProtoVersion : 8, Rest2/binary>> = Rest1,
<<UsernameFlag : 1,
PasswordFlag : 1,
@ -84,7 +81,7 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type,
{WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
{UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
{PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
case ProtocolMagic == ?PROTOCOL_MAGIC of
case protocol_name_approved(ProtoVersion, ProtoName) of
true ->
wrap(Fixed,
#mqtt_frame_connect{
@ -265,4 +262,5 @@ opt(false) -> 0;
opt(true) -> 1;
opt(X) when is_integer(X) -> X.
protocol_name_approved(Ver, Name) ->
lists:member({Ver, Name}, ?PROTOCOL_NAMES).

View File

@ -26,10 +26,10 @@
-import(proplists, [get_value/2, get_value/3]).
-export([handle/2]).
-export([handle/1]).
handle(Req, Auth) ->
case authorized(Req, Auth) of
handle(Req) ->
case authorized(Req) of
true ->
Path = Req:get(path),
Method = Req:get(method),
@ -44,11 +44,9 @@ handle('POST', "/mqtt/publish", Req) ->
error_logger:info_msg("~p~n", [Params]),
Topic = get_value("topic", Params),
Message = list_to_binary(get_value("message", Params)),
Qos = list_to_integer(get_value("qos", Params, "0")),
%TODO: DUP, RETAIN...
emqtt_pubsub:publish(#mqtt_msg {
retain = 0,
qos = Qos,
qos = ?QOS_0,
topic = Topic,
dup = 0,
payload = Message
@ -61,14 +59,13 @@ handle(_Method, _Path, Req) ->
%%------------------------------------------------------------------------------
%% basic authorization
%%------------------------------------------------------------------------------
authorized(Req, {Username, Password}) ->
authorized(Req) ->
case mochiweb_request:get_header_value("Authorization", Req) of
undefined -> false;
undefined ->
false;
"Basic " ++ BasicAuth ->
case user_passwd(BasicAuth) of
{Username, Password} -> true;
_ -> false
end
{Username, Password} = user_passwd(BasicAuth),
emqtt_auth:check(Username, Password)
end.
user_passwd(BasicAuth) ->

View File

@ -26,7 +26,7 @@
-include("emqtt_log.hrl").
-include("emqtt_internal.hrl").
-include("emqtt_topic.hrl").
-include_lib("stdlib/include/qlc.hrl").
@ -174,7 +174,6 @@ handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
undefined ->
?ERROR("unexpected 'DOWN': ~p", [Mon]);
SubPid ->
%?INFO("subscriber DOWN: ~p", [SubPid]),
erase({submon, Mon}),
erase({subscriber, SubPid}),
Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}),

View File

@ -1,86 +0,0 @@
%%TODO: SHOULD BE REPLACED BY emqtt_cm.erl......
-module(emqtt_registry).
-include("emqtt.hrl").
-include("emqtt_log.hrl").
-export([start_link/0,
size/0,
register/2,
unregister/1]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
size() ->
ets:info(client, size).
register(ClientId, Pid) ->
gen_server:cast(?SERVER, {register, ClientId, Pid}).
unregister(ClientId) ->
gen_server:cast(?SERVER, {unregister, ClientId}).
%%----------------------------------------------------------------------------
init([]) ->
ets:new(client, [set, protected, named_table]),
?INFO("~p is started.", [?MODULE]),
{ok, #state{}}. % clientid -> {pid, monitor}
%%--------------------------------------------------------------------------
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({register, ClientId, Pid}, State) ->
case ets:lookup(client, ClientId) of
[{_, {OldPid, MRef}}] ->
catch gen_server:call(OldPid, duplicate_id),
erlang:demonitor(MRef);
[] ->
ignore
end,
ets:insert(client, {ClientId, {Pid, erlang:monitor(process, Pid)}}),
{noreply, State};
handle_cast({unregister, ClientId}, State) ->
case ets:lookup(client, ClientId) of
[{_, {_Pid, MRef}}] ->
erlang:demonitor(MRef),
ets:delete(client, ClientId);
[] ->
ignore
end,
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(client, {'_', {DownPid, MRef}}),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -45,7 +45,7 @@
%% There can be any number of root nodes; that is, there can be any number of topic trees.
%% ------------------------------------------------------------------------
-include("emqtt_internal.hrl").
-include("emqtt_topic.hrl").
-export([new/1,
type/1,

View File

@ -36,12 +36,11 @@
{listen, [
{mqtt, 1883, [
{max_conns, 1024},
{acceptor_pool, 2}
{acceptor_pool, 4}
]},
{http, 8883, [
{max_conns, 512},
{acceptor_pool, 1},
{auth, {"user", "passwd"}}
{acceptor_pool, 1}
]}
]}
]}