From 23163edab773bb868eab0affa2454a30a05cb56c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 8 Dec 2014 17:40:45 +0800 Subject: [PATCH] rewrite all modules --- apps/emqtt/include/emqtt.hrl | 16 +++- apps/emqtt/include/emqtt_frame.hrl | 3 +- .../{emqtt_internal.hrl => emqtt_topic.hrl} | 14 +-- apps/emqtt/src/emqtt.app.src | 2 +- apps/emqtt/src/emqtt.erl | 5 +- apps/emqtt/src/emqtt_app.erl | 39 ++++++++- apps/emqtt/src/emqtt_auth.erl | 40 ++++----- apps/emqtt/src/emqtt_auth_anonymous.erl | 3 +- apps/emqtt/src/emqtt_auth_internal.erl | 16 ++-- apps/emqtt/src/emqtt_client.erl | 14 +-- apps/emqtt/src/emqtt_cm.erl | 11 +++ apps/emqtt/src/emqtt_frame.erl | 10 +-- apps/emqtt/src/emqtt_http.erl | 21 ++--- apps/emqtt/src/emqtt_pubsub.erl | 3 +- apps/emqtt/src/emqtt_registry.erl | 86 ------------------- apps/emqtt/src/emqtt_topic.erl | 2 +- rel/files/app.config | 5 +- 17 files changed, 122 insertions(+), 168 deletions(-) rename apps/emqtt/include/{emqtt_internal.hrl => emqtt_topic.hrl} (84%) delete mode 100644 apps/emqtt/src/emqtt_registry.erl diff --git a/apps/emqtt/include/emqtt.hrl b/apps/emqtt/include/emqtt.hrl index b543bde66..ea8e9571d 100644 --- a/apps/emqtt/include/emqtt.hrl +++ b/apps/emqtt/include/emqtt.hrl @@ -31,7 +31,9 @@ -define(ERTS_MINIMUM, "6.0"). -%% qos levels +%%------------------------------------------------------------------------------ +%% MQTT Qos +%%------------------------------------------------------------------------------ -define(QOS_0, 0). -define(QOS_1, 1). @@ -39,6 +41,9 @@ -type qos() :: ?QOS_2 | ?QOS_1 | ?QOS_0. +%%------------------------------------------------------------------------------ +%% MQTT Message +%%------------------------------------------------------------------------------ -record(mqtt_msg, { retain, qos, @@ -51,4 +56,13 @@ -type mqtt_msg() :: #mqtt_msg{}. +%%------------------------------------------------------------------------------ +%% MQTT User Management +%%------------------------------------------------------------------------------ +-record(emqtt_user, { + username :: binary(), + passwdhash :: binary() +}). + + diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl index b0ad1eee3..82c7d196c 100644 --- a/apps/emqtt/include/emqtt_frame.hrl +++ b/apps/emqtt/include/emqtt_frame.hrl @@ -2,7 +2,6 @@ % NOTICE: copy from rabbitmq mqtt-adaper % - %% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License @@ -19,6 +18,8 @@ %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% +-define(PROTOCOL_NAMES, [{3, "MQIsdp"}, {4, "MQTT"}]). + -define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MINOR, 1). diff --git a/apps/emqtt/include/emqtt_internal.hrl b/apps/emqtt/include/emqtt_topic.hrl similarity index 84% rename from apps/emqtt/include/emqtt_internal.hrl rename to apps/emqtt/include/emqtt_topic.hrl index ed60b1ea4..059d53b71 100644 --- a/apps/emqtt/include/emqtt_internal.hrl +++ b/apps/emqtt/include/emqtt_topic.hrl @@ -21,11 +21,8 @@ %%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------ -%% pubsub topic +%% Core PubSub Topic %%------------------------------------------------------------------------------ -%name: <<"a/b/c">> -%node: node() -%words: [<<"a">>, <<"b">>, <<"c">>] -record(topic, { name :: binary(), node :: node() @@ -55,12 +52,3 @@ node_id :: binary() }). - -%%------------------------------------------------------------------------------ -%% internal user -%%------------------------------------------------------------------------------ --record(internal_user, { - username :: binary(), - passwdhash :: binary() -}). - diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src index 68c2c1fbd..94c56b4df 100644 --- a/apps/emqtt/src/emqtt.app.src +++ b/apps/emqtt/src/emqtt.app.src @@ -3,7 +3,7 @@ {description, "Erlang MQTT Broker"}, {vsn, git}, {modules, []}, - {registered, [ ]}, + {registered, []}, {applications, [kernel, stdlib]}, {mod, {emqtt_app, []}}, diff --git a/apps/emqtt/src/emqtt.erl b/apps/emqtt/src/emqtt.erl index b618f26cc..42055f2f5 100644 --- a/apps/emqtt/src/emqtt.erl +++ b/apps/emqtt/src/emqtt.erl @@ -40,7 +40,6 @@ listen({mqtt, Port, Options}) -> esockd:listen(mqtt, Port, Options ++ ?MQTT_SOCKOPTS, MFArgs); listen({http, Port, Options}) -> - Auth = proplists:get_value(auth, Options), - MFArgs = {emqtt_http, handle, [Auth]}, - mochiweb:start_http(Port, proplists:delete(auth, Options), MFArgs). + MFArgs = {emqtt_http, handle, []}, + mochiweb:start_http(Port, Options, MFArgs). diff --git a/apps/emqtt/src/emqtt_app.erl b/apps/emqtt/src/emqtt_app.erl index eab07ffd2..8d446be36 100644 --- a/apps/emqtt/src/emqtt_app.erl +++ b/apps/emqtt/src/emqtt_app.erl @@ -40,12 +40,13 @@ %% start(_StartType, _StartArgs) -> print_banner(), - {ok, SupPid} = emqtt_sup:start_link(), + {ok, Sup} = emqtt_sup:start_link(), + start_servers(Sup), {ok, Listeners} = application:get_env(listen), emqtt:listen(Listeners), register(emqtt, self()), print_vsn(), - {ok, SupPid}. + {ok, Sup}. print_banner() -> ?PRINT("starting emqtt on node '~s'~n", [node()]). @@ -55,6 +56,40 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). +start_servers(Sup) -> + lists:foreach( + fun({Name, F}) when is_function(F) -> + ?PRINT("~s is starting...", [Name]), + F(), + ?PRINT_MSG("[done]~n"); + ({Name, Server}) when is_atom(Server) -> + ?PRINT("~s is starting...", [Name]), + start_child(Sup, Server), + ?PRINT_MSG("[done]~n"); + ({Name, Server, Opts}) when is_atom(Server) -> + ?PRINT("~s is starting...", [ Name]), + start_child(Sup, Server, Opts), + ?PRINT_MSG("[done]~n") + end, + [{"emqtt cm", emqtt_cm}, + {"emqtt auth", emqtt_auth}, + {"emqtt retained", emqtt_retained}, + {"emqtt pubsub", emqtt_pubsub}, + {"emqtt monitor", emqtt_monitor} + ]). + +start_child(Sup, Name) -> + {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)). +start_child(Sup, Name, Opts) -> + {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)). + +worker_spec(Name) -> + {Name, {Name, start_link, []}, + permanent, 5000, worker, [Name]}. +worker_spec(Name, Opts) -> + {Name, {Name, start_link, [Opts]}, + permanent, 5000, worker, [Name]}. + %% %% @spec stop(atom) -> 'ok' %% diff --git a/apps/emqtt/src/emqtt_auth.erl b/apps/emqtt/src/emqtt_auth.erl index 7b13b2a8a..ede016cda 100644 --- a/apps/emqtt/src/emqtt_auth.erl +++ b/apps/emqtt/src/emqtt_auth.erl @@ -22,13 +22,12 @@ -module(emqtt_auth). --author('ery.lee@gmail.com'). +-author('feng.lee@slimchat.io'). -include("emqtt.hrl"). -include("emqtt_log.hrl"). - -export([start_link/0, add/2, check/2, @@ -43,44 +42,40 @@ terminate/2, code_change/3]). --record(state, {authmod, authopts}). - start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec check(Usename :: binary(), Password :: binary()) -> true | false. check(Username, Password) -> - gen_server:call(?MODULE, {check, Username, Password}). + execute(check, [Username, Password]). -add(Username, Password) when is_binary(Username) -> - gen_server:call(?MODULE, {add, Username, Password}). +-spec add(Usename :: binary(), Password :: binary()) -> ok. +add(Username, Password) -> + execute(add, [Username, Password]). -delete(Username) when is_binary(Username) -> - gen_server:cast(?MODULE, {delete, Username}). +-spec delete(Username :: binary()) -> ok. +delete(Username) -> + execute(delete, [Username]). + +execute(F, Args) -> + [{_, M}] = ets:lookup(emqtt_auth, mod), + apply(M, F, Args). init([]) -> {ok, {Name, Opts}} = application:get_env(auth), AuthMod = authmod(Name), ok = AuthMod:init(Opts), - ?INFO("authmod is ~p", [AuthMod]), - ?INFO("~p is started", [?MODULE]), - {ok, #state{authmod=AuthMod, authopts=Opts}}. + ets:new(emqtt_auth, [named_table, protected]), + ets:insert(emqtt_quth, {mod, AuthMod}), + ?PRINT("emqtt authmod is ~p", [AuthMod]), + {ok, undefined}. authmod(Name) when is_atom(Name) -> list_to_atom(lists:concat(["emqtt_auth_", Name])). -handle_call({check, Username, Password}, _From, #state{authmod=AuthMod} = State) -> - {reply, AuthMod:check(Username, Password), State}; - -handle_call({add, Username, Password}, _From, #state{authmod=AuthMod} = State) -> - {reply, AuthMod:add(Username, Password), State}; - handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({delete, Username}, #state{authmod=AuthMod} = State) -> - AuthMod:delete(Username), - {noreply, State}; - handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. @@ -92,3 +87,4 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + diff --git a/apps/emqtt/src/emqtt_auth_anonymous.erl b/apps/emqtt/src/emqtt_auth_anonymous.erl index a41c66c00..8beb7463c 100644 --- a/apps/emqtt/src/emqtt_auth_anonymous.erl +++ b/apps/emqtt/src/emqtt_auth_anonymous.erl @@ -22,7 +22,7 @@ -module(emqtt_auth_anonymous). --author('ery.lee@gmail.com'). +-author('feng.lee@slimchat.io'). -export([init/1, add/2, @@ -36,3 +36,4 @@ check(_, _) -> true. add(_, _) -> ok. delete(_Username) -> ok. + diff --git a/apps/emqtt/src/emqtt_auth_internal.erl b/apps/emqtt/src/emqtt_auth_internal.erl index f0148cffe..109e87d9f 100644 --- a/apps/emqtt/src/emqtt_auth_internal.erl +++ b/apps/emqtt/src/emqtt_auth_internal.erl @@ -22,7 +22,7 @@ -module(emqtt_auth_internal). --include("emqtt_internal.hrl"). +-include("emqtt.hrl"). -export([init/1, add/2, @@ -30,10 +30,10 @@ delete/1]). init(_Opts) -> - mnesia:create_table(internal_user, [ + mnesia:create_table(emqtt_user, [ {ram_copies, [node()]}, - {attributes, record_info(fields, internal_user)}]), - mnesia:add_table_copy(internal_user, node(), ram_copies), + {attributes, record_info(fields, emqtt_user)}]), + mnesia:add_table_copy(emqtt_user, node(), ram_copies), ok. check(undefined, _) -> false; @@ -42,14 +42,14 @@ check(_, undefined) -> false; check(Username, Password) when is_binary(Username) -> PasswdHash = crypto:hash(md5, Password), - case mnesia:dirty_read(internal_user, Username) of - [#internal_user{passwdhash=PasswdHash}] -> true; + case mnesia:dirty_read(emqtt_user, Username) of + [#emqtt_user{passwdhash=PasswdHash}] -> true; _ -> false end. add(Username, Password) when is_binary(Username) and is_binary(Password) -> - mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:hash(md5, Password)}). + mnesia:dirty_write(#emqtt_user{username=Username, passwdhash=crypto:hash(md5, Password)}). delete(Username) when is_binary(Username) -> - mnesia:dirty_delete(internal_user, Username). + mnesia:dirty_delete(emqtt_user, Username). diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index dc35f3149..ba487177c 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -24,7 +24,7 @@ -behaviour(gen_server). --export([start_link/1, info/1, go/2]). +-export([start_link/1, info/1, go/2, stop/2]). -export([init/1, handle_call/3, @@ -39,8 +39,6 @@ -include("emqtt_frame.hrl"). --include("emqtt_internal.hrl"). - -define(CLIENT_ID_MAXLEN, 23). -record(state, {socket, @@ -71,6 +69,9 @@ info(Pid) -> go(Pid, Sock) -> gen_server:call(Pid, {go, Sock}). +stop(Pid, Error) -> + gen_server:cast(Pid, {stop, Error}). + init([Sock]) -> {ok, #state{socket = Sock}}. @@ -89,9 +90,6 @@ handle_call({go, Sock}, _From, State=#state{socket = Sock}) -> awaiting_ack = gb_trees:empty(), awaiting_rel = gb_trees:empty()})}; -handle_call(duplicate_id, _From, State=#state{conn_name=ConnName, client_id=ClientId}) -> - ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), - stop({shutdown, duplicate_id}, State); handle_call(info, _From, #state{conn_name=ConnName, message_id=MsgId, client_id=ClientId} = State) -> @@ -103,6 +101,10 @@ handle_call(info, _From, #state{conn_name=ConnName, handle_call(_Req, _From, State) -> {reply, ok, State}. +handle_cast({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) -> + ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), + stop({shutdown, duplicate_id}, State); + handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index b1515dd75..bf3b4469d 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -56,15 +56,25 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) -> case ets:lookup(emqtt_client, ClientId) of [{_, Pid}] -> Pid; [] -> undefined end. +-spec create(ClientId :: binary(), Pid :: pid()) -> ok. create(ClientId, Pid) -> + case lookup(ClientId) of + OldPid when is_pid(OldPid) -> + %%TODO: FIX STOP... + emqtt_client:stop(OldPid, duplicate_id); + undefined -> + ok + end, ets:insert(emqtt_client, {ClientId, Pid}). +-spec destroy(binary() | pid()) -> ok. destroy(ClientId) when is_binary(ClientId) -> ets:delete(emqtt_client, ClientId); @@ -95,3 +105,4 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + diff --git a/apps/emqtt/src/emqtt_frame.erl b/apps/emqtt/src/emqtt_frame.erl index 156d4fb2a..5b93ab3dd 100644 --- a/apps/emqtt/src/emqtt_frame.erl +++ b/apps/emqtt/src/emqtt_frame.erl @@ -27,13 +27,10 @@ -include("emqtt_frame.hrl"). --include("emqtt_internal.hrl"). - -export([parse/2, initial_state/0]). -export([serialise/1]). -define(RESERVED, 0). --define(PROTOCOL_MAGIC, "MQIsdp"). -define(MAX_LEN, 16#fffffff). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). @@ -68,7 +65,7 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type, qos = Qos } = Fixed, Length) -> case {Type, Bin} of {?CONNECT, <>} -> - {ProtocolMagic, Rest1} = parse_utf(FrameBin), + {ProtoName, Rest1} = parse_utf(FrameBin), <> = Rest1, <>} = parse_utf(Rest7, PasswordFlag), - case ProtocolMagic == ?PROTOCOL_MAGIC of + case protocol_name_approved(ProtoVersion, ProtoName) of true -> wrap(Fixed, #mqtt_frame_connect{ @@ -265,4 +262,5 @@ opt(false) -> 0; opt(true) -> 1; opt(X) when is_integer(X) -> X. - +protocol_name_approved(Ver, Name) -> + lists:member({Ver, Name}, ?PROTOCOL_NAMES). diff --git a/apps/emqtt/src/emqtt_http.erl b/apps/emqtt/src/emqtt_http.erl index 63de4eaac..fab3ae4c4 100644 --- a/apps/emqtt/src/emqtt_http.erl +++ b/apps/emqtt/src/emqtt_http.erl @@ -26,10 +26,10 @@ -import(proplists, [get_value/2, get_value/3]). --export([handle/2]). +-export([handle/1]). -handle(Req, Auth) -> - case authorized(Req, Auth) of +handle(Req) -> + case authorized(Req) of true -> Path = Req:get(path), Method = Req:get(method), @@ -44,11 +44,9 @@ handle('POST', "/mqtt/publish", Req) -> error_logger:info_msg("~p~n", [Params]), Topic = get_value("topic", Params), Message = list_to_binary(get_value("message", Params)), - Qos = list_to_integer(get_value("qos", Params, "0")), - %TODO: DUP, RETAIN... emqtt_pubsub:publish(#mqtt_msg { retain = 0, - qos = Qos, + qos = ?QOS_0, topic = Topic, dup = 0, payload = Message @@ -61,14 +59,13 @@ handle(_Method, _Path, Req) -> %%------------------------------------------------------------------------------ %% basic authorization %%------------------------------------------------------------------------------ -authorized(Req, {Username, Password}) -> +authorized(Req) -> case mochiweb_request:get_header_value("Authorization", Req) of - undefined -> false; + undefined -> + false; "Basic " ++ BasicAuth -> - case user_passwd(BasicAuth) of - {Username, Password} -> true; - _ -> false - end + {Username, Password} = user_passwd(BasicAuth), + emqtt_auth:check(Username, Password) end. user_passwd(BasicAuth) -> diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index ed5e93fee..4eccbe356 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -26,7 +26,7 @@ -include("emqtt_log.hrl"). --include("emqtt_internal.hrl"). +-include("emqtt_topic.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -174,7 +174,6 @@ handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) -> undefined -> ?ERROR("unexpected 'DOWN': ~p", [Mon]); SubPid -> - %?INFO("subscriber DOWN: ~p", [SubPid]), erase({submon, Mon}), erase({subscriber, SubPid}), Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}), diff --git a/apps/emqtt/src/emqtt_registry.erl b/apps/emqtt/src/emqtt_registry.erl deleted file mode 100644 index 1c982c260..000000000 --- a/apps/emqtt/src/emqtt_registry.erl +++ /dev/null @@ -1,86 +0,0 @@ - -%%TODO: SHOULD BE REPLACED BY emqtt_cm.erl...... - --module(emqtt_registry). - --include("emqtt.hrl"). - --include("emqtt_log.hrl"). - --export([start_link/0, - size/0, - register/2, - unregister/1]). - --behaviour(gen_server). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, {}). - --define(SERVER, ?MODULE). - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -size() -> - ets:info(client, size). - -register(ClientId, Pid) -> - gen_server:cast(?SERVER, {register, ClientId, Pid}). - -unregister(ClientId) -> - gen_server:cast(?SERVER, {unregister, ClientId}). - -%%---------------------------------------------------------------------------- - -init([]) -> - ets:new(client, [set, protected, named_table]), - ?INFO("~p is started.", [?MODULE]), - {ok, #state{}}. % clientid -> {pid, monitor} - -%%-------------------------------------------------------------------------- -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({register, ClientId, Pid}, State) -> - case ets:lookup(client, ClientId) of - [{_, {OldPid, MRef}}] -> - catch gen_server:call(OldPid, duplicate_id), - erlang:demonitor(MRef); - [] -> - ignore - end, - ets:insert(client, {ClientId, {Pid, erlang:monitor(process, Pid)}}), - {noreply, State}; - -handle_cast({unregister, ClientId}, State) -> - case ets:lookup(client, ClientId) of - [{_, {_Pid, MRef}}] -> - erlang:demonitor(MRef), - ets:delete(client, ClientId); - [] -> - ignore - end, - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. - -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(client, {'_', {DownPid, MRef}}), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index e82b02956..b40fc3552 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -45,7 +45,7 @@ %% There can be any number of root nodes; that is, there can be any number of topic trees. %% ------------------------------------------------------------------------ --include("emqtt_internal.hrl"). +-include("emqtt_topic.hrl"). -export([new/1, type/1, diff --git a/rel/files/app.config b/rel/files/app.config index 6524fdb02..3521665f8 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -36,12 +36,11 @@ {listen, [ {mqtt, 1883, [ {max_conns, 1024}, - {acceptor_pool, 2} + {acceptor_pool, 4} ]}, {http, 8883, [ {max_conns, 512}, - {acceptor_pool, 1}, - {auth, {"user", "passwd"}} + {acceptor_pool, 1} ]} ]} ]}