This commit is contained in:
Ery Lee 2012-12-19 13:42:03 +08:00
parent 11db16725e
commit cb04c9c7e5
21 changed files with 358 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
ebin/*

7
Makefile Normal file
View File

@ -0,0 +1,7 @@
all: compile
run: compile
erl -pa ebin -config etc/emqtt.config -s emqtt_app start
compile:
rebar compile

1
TODO Normal file
View File

@ -0,0 +1 @@
1. Topic Trie

12
docs/cluster.md Normal file
View File

@ -0,0 +1,12 @@
Cluster Architecture
====================
Topic: Memory Copy
Topic ----------- Topic
Subscriber: Local Node

13
docs/desgin.md Normal file
View File

@ -0,0 +1,13 @@
Erlang TCP
=========
One Million TCP Connections
==========================
http://news.ycombinator.com/item?id=3028547
http://www.kegel.com/c10k.html
http://20bits.com/article/erlang-a-generalized-tcp-server

15
docs/topic.md Normal file
View File

@ -0,0 +1,15 @@
Direct Topic
or
Wildchar Topic?
a/+/b
a/#
#
Trie Data Structure

15
etc/emqtt.config Normal file
View File

@ -0,0 +1,15 @@
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
[{kernel,
[{start_timer, true},
{start_pg2, true}
]},
{sasl, [
{sasl_error_logger, {file, "log/emqtt_sasl.log"}}
]},
{mnesia, [
{dir, "var/mnesia"}
]}
].

10
include/emqtt.hrl Normal file
View File

@ -0,0 +1,10 @@
-record(direct_topic, {name, node}).
-record(wildcard_topic, {words, node}).
-record(subscriber, {topic, pid}).

0
log/.placeholder Normal file
View File

BIN
src/.emqtt_topic.erl.swp Normal file

Binary file not shown.

12
src/emqtt.app.src Normal file
View File

@ -0,0 +1,12 @@
{application, emqtt,
[
{description, ""},
{vsn, "1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { emqtt_app, []}},
{env, []}
]}.

32
src/emqtt_app.erl Normal file
View File

@ -0,0 +1,32 @@
-module(emqtt_app).
-export([start/0]).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
-define(APPS, [sasl, mnesia, emqtt]).
start() ->
[start_app(App) || App <- ?APPS].
start_app(mnesia) ->
mnesia:create_schema([node()]),
mnesia:start();
start_app(App) ->
application:start(App).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
emqtt_sup:start_link().
stop(_State) ->
ok.

5
src/emqtt_client.erl Normal file
View File

@ -0,0 +1,5 @@
%simulate a mqtt connection
-module(emqtt_client).

1
src/emqtt_client_sup.erl Normal file
View File

@ -0,0 +1 @@
-module(emqtt_client_sup).

1
src/emqtt_frame.erl Normal file
View File

@ -0,0 +1 @@
-module(emqtt_frame).

2
src/emqtt_lib.erl Normal file
View File

@ -0,0 +1,2 @@
-module(emqtt_lib).

3
src/emqtt_reader.erl Normal file
View File

@ -0,0 +1,3 @@
%tcp data reader
-module(emqtt_reader).

39
src/emqtt_router.erl Normal file
View File

@ -0,0 +1,39 @@
-module(emqtt_router).
-include("emqtt.hrl").
-export([start_link/0]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state,{}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
{ok, #state{}}.
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, _State, _Extra) ->
ok.

42
src/emqtt_subscriber.erl Normal file
View File

@ -0,0 +1,42 @@
-module(emqtt_subscriber).
-include("emqtt.hrl").
-export([start_link/0]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state,{}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
ets:new(subscriber, [bag, protected, {keypos, 2}]),
{ok, #state{}}.
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, _State, _Extra) ->
ok.

32
src/emqtt_sup.erl Normal file
View File

@ -0,0 +1,32 @@
-module(emqtt_sup).
-include("emqtt.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_all, 5, 10}, [
?CHILD(emqtt_topic, worker),
?CHILD(emqtt_router, worker)
]} }.

115
src/emqtt_topic.erl Normal file
View File

@ -0,0 +1,115 @@
-module(emqtt_topic).
-include("emqtt.hrl").
-export([start_link/0,
match/1,
insert/1,
delete/1]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
match(Topic) when is_binary(Topic) ->
DirectMatches = mnesia:dirty_read(direct_topic, Topic),
Words = topic_split(Topic),
WildcardMatches = lists:append([
mnesia:dirty_read(wildcard_topic, Key) ||
Key <- mnesia:dirty_all_keys(wildcard_topic), topic_match(Words, Key)
]),
DirectMatches ++ WildcardMatches.
insert(Topic) when is_binary(Topic) ->
gen_server:call(?MODULE, {insert, Topic}).
delete(Topic) when is_binary(Topic) ->
gen_server:cast(?MODULE, {delete, Topic}).
init([]) ->
{atomic, ok} = mnesia:create_table(
direct_topic, [
{ram_copies, [node()]},
{attributes, record_info(fields, direct_topic)}]),
{atomic, ok} = mnesia:create_table(
wildcard_topic, [
{ram_copies, [node()]},
{attributes, record_info(fields, wildcard_topic)}]),
{ok, #state{}}.
handle_call({insert, Topic}, _From, State) ->
Words = topic_split(Topic),
Reply =
case topic_type(Words) of
direct ->
mnesia:dirty_write(#direct_topic{name=Topic});
wildcard ->
mnesia:dirty_write(#wildcard_topic{words=Words})
end,
{reply, Reply, State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({delete, Topic}, State) ->
Words = topic_split(Topic),
case topic_type(Words) of
direct ->
mnesia:dirty_delete(direct_topic, Topic);
wildcard ->
mnesia:direct_delete(wildcard_topic, Words)
end,
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, _State, _Extra) ->
ok.
topic_type([]) ->
direct;
topic_type([<<"#">>]) ->
wildcard;
topic_type([<<"+">>|_T]) ->
wildcard;
topic_type([_|T]) ->
topic_type(T).
topic_match([], []) ->
true;
topic_match([H|T1], [H|T2]) ->
topic_match(T1, T2);
topic_match([_H|T1], [<<"+">>|T2]) ->
topic_match(T1, T2);
topic_match(_, [<<"#">>]) ->
true;
topic_match([], [_H|_T2]) ->
false.
topic_split(S) ->
binary:split(S, [<<"/">>], [global]).