From cb04c9c7e572ad0917fff949b0271a5d007a4072 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Wed, 19 Dec 2012 13:42:03 +0800 Subject: [PATCH] add file --- .gitignore | 1 + Makefile | 7 +++ TODO | 1 + docs/cluster.md | 12 ++++ docs/desgin.md | 13 +++++ docs/topic.md | 15 +++++ etc/emqtt.config | 15 +++++ include/emqtt.hrl | 10 ++++ log/.placeholder | 0 src/.emqtt_topic.erl.swp | Bin 0 -> 12288 bytes src/emqtt.app.src | 12 ++++ src/emqtt_app.erl | 32 +++++++++++ src/emqtt_client.erl | 5 ++ src/emqtt_client_sup.erl | 1 + src/emqtt_frame.erl | 1 + src/emqtt_lib.erl | 2 + src/emqtt_reader.erl | 3 + src/emqtt_router.erl | 39 +++++++++++++ src/emqtt_subscriber.erl | 42 ++++++++++++++ src/emqtt_sup.erl | 32 +++++++++++ src/emqtt_topic.erl | 115 +++++++++++++++++++++++++++++++++++++++ 21 files changed, 358 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 TODO create mode 100644 docs/cluster.md create mode 100644 docs/desgin.md create mode 100644 docs/topic.md create mode 100644 etc/emqtt.config create mode 100644 include/emqtt.hrl create mode 100644 log/.placeholder create mode 100644 src/.emqtt_topic.erl.swp create mode 100644 src/emqtt.app.src create mode 100644 src/emqtt_app.erl create mode 100644 src/emqtt_client.erl create mode 100644 src/emqtt_client_sup.erl create mode 100644 src/emqtt_frame.erl create mode 100644 src/emqtt_lib.erl create mode 100644 src/emqtt_reader.erl create mode 100644 src/emqtt_router.erl create mode 100644 src/emqtt_subscriber.erl create mode 100644 src/emqtt_sup.erl create mode 100644 src/emqtt_topic.erl diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..b191b86db --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +ebin/* diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..e577ac1ca --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +all: compile + +run: compile + erl -pa ebin -config etc/emqtt.config -s emqtt_app start + +compile: + rebar compile diff --git a/TODO b/TODO new file mode 100644 index 000000000..851d9ac39 --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +1. Topic Trie diff --git a/docs/cluster.md b/docs/cluster.md new file mode 100644 index 000000000..281b1e700 --- /dev/null +++ b/docs/cluster.md @@ -0,0 +1,12 @@ + +Cluster Architecture +==================== + + +Topic: Memory Copy + +Topic ----------- Topic + +Subscriber: Local Node + + diff --git a/docs/desgin.md b/docs/desgin.md new file mode 100644 index 000000000..c3c98819c --- /dev/null +++ b/docs/desgin.md @@ -0,0 +1,13 @@ +Erlang TCP +========= + + +One Million TCP Connections +========================== + +http://news.ycombinator.com/item?id=3028547 + +http://www.kegel.com/c10k.html + +http://20bits.com/article/erlang-a-generalized-tcp-server + diff --git a/docs/topic.md b/docs/topic.md new file mode 100644 index 000000000..91755dec5 --- /dev/null +++ b/docs/topic.md @@ -0,0 +1,15 @@ + +Direct Topic + +or + +Wildchar Topic? + + +a/+/b +a/# +# + +Trie Data Structure + + diff --git a/etc/emqtt.config b/etc/emqtt.config new file mode 100644 index 000000000..0d974db0e --- /dev/null +++ b/etc/emqtt.config @@ -0,0 +1,15 @@ +%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- +%% ex: ft=erlang ts=4 sw=4 et +[{kernel, + [{start_timer, true}, + {start_pg2, true} + ]}, + {sasl, [ + {sasl_error_logger, {file, "log/emqtt_sasl.log"}} + ]}, + {mnesia, [ + {dir, "var/mnesia"} + ]} +]. + + diff --git a/include/emqtt.hrl b/include/emqtt.hrl new file mode 100644 index 000000000..fc5f70e56 --- /dev/null +++ b/include/emqtt.hrl @@ -0,0 +1,10 @@ + + +-record(direct_topic, {name, node}). + +-record(wildcard_topic, {words, node}). + +-record(subscriber, {topic, pid}). + + + diff --git a/log/.placeholder b/log/.placeholder new file mode 100644 index 000000000..e69de29bb diff --git a/src/.emqtt_topic.erl.swp b/src/.emqtt_topic.erl.swp new file mode 100644 index 0000000000000000000000000000000000000000..a5e73750e123ca0fb1cebde876c47e0724421d9c GIT binary patch literal 12288 zcmeI2UuYaf9LJ~9YF88OACMv_V}i0r_U@WQQPU(+NvhSxN|H8|5SG2&NwVDD?qzn< z^t_yZD5&6@|FHU?;)72rib!8XUlpYOK_4uA6ZAzyw7yyW&d%=L?X@Ns@ue^apX}bw z{C@MB@9#G=3E9&A?J}NuX&7#hK)b6U&PAplx05< z1m*lemlwRiw!Dnt6S8A*F5u;vs%5hxtE@y(G32(qGozsD+6Bh5b{Qw?i&8)-uyqAi zqs9Eay<~W3aKA9_@4KDu-E(Z~ew2k$Kq;UUPzopolmbctrGQdEDR9*)5LLI4ClUQM z$qd5ex~=1yd{q~vfKosypcGIFC;ac);g-0uENd!{9c6*AmE|J%oG#J_nzGSHWq(KpyM_zuiE{x8MwT z4m=A^fKf0E_JJO7{(3^b1h0XYz&fad6JP|~2ljxA8X-S`bKq_8GFSo@coa;5A+Q_l z0)Onp_~0Ax5qKZG2c7{>gQtKG7&rnBf;+%1Uun*h}wu1}EsmQC~9dH^v2qwV<=mDSYAmmeU7Mua7wO@ zEFHG;Uww%n#5A`!!!(R;Y`f{RMFqDcjTPC;`n*V%`OvyKJG2lhNVJHrvI9xlDsd-M z=C)ttHti(dYBOc??4UtshjLjP;dFLTct=$Xz;eB9giVc8TUw#CQe87NPGnv%;X+H|bLkv5pK{FIkT3F9rrZjGqmI#{L=w zf^i&=nW9Y>O-ak;)|GI{pi|*e%a+pA+P1<-M_Y%nA8=OnYUyAHh{k-!AevV3S?F3L zQVG*x9z*=8#PY>QgW}CdC(Mzi^jM}eV;j{{)5KA{W1yr&m7uB;rQ_v|2pN?-N}M|B zoCb?Ti*M;z(keG4>B7j&uP)7F*MwkW@pKxS=$1HW*RAeb6*d-6ypa(PqHmyH94;N}Qbx?b?)o9) zQA@@vX%UnS+8>INuQ#BdCO&m5inv=^jaX<TjiH#sz#n|wY-t9uUSem`gm~M;y6e*y>#PA4#;>X>oO-;E<7?l*Dve!_Jg+!xkUTq84LGV* z)NFCkz+bB+wrrhrgH^7#s!WbA%d-G%G~%}DuQjLQe&N;iZEqE;y)S;Z YzOc-_KH)T937l1|;rOXkUOXoM0L5$5H~;_u literal 0 HcmV?d00001 diff --git a/src/emqtt.app.src b/src/emqtt.app.src new file mode 100644 index 000000000..63d024b5e --- /dev/null +++ b/src/emqtt.app.src @@ -0,0 +1,12 @@ +{application, emqtt, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqtt_app, []}}, + {env, []} + ]}. diff --git a/src/emqtt_app.erl b/src/emqtt_app.erl new file mode 100644 index 000000000..efd7cc6dd --- /dev/null +++ b/src/emqtt_app.erl @@ -0,0 +1,32 @@ +-module(emqtt_app). + +-export([start/0]). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +-define(APPS, [sasl, mnesia, emqtt]). + +start() -> + [start_app(App) || App <- ?APPS]. + +start_app(mnesia) -> + mnesia:create_schema([node()]), + mnesia:start(); + +start_app(App) -> + application:start(App). + + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + emqtt_sup:start_link(). + +stop(_State) -> + ok. + diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl new file mode 100644 index 000000000..bd83b5f7b --- /dev/null +++ b/src/emqtt_client.erl @@ -0,0 +1,5 @@ +%simulate a mqtt connection +-module(emqtt_client). + + + diff --git a/src/emqtt_client_sup.erl b/src/emqtt_client_sup.erl new file mode 100644 index 000000000..ce0f47441 --- /dev/null +++ b/src/emqtt_client_sup.erl @@ -0,0 +1 @@ +-module(emqtt_client_sup). diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl new file mode 100644 index 000000000..102310f9a --- /dev/null +++ b/src/emqtt_frame.erl @@ -0,0 +1 @@ +-module(emqtt_frame). diff --git a/src/emqtt_lib.erl b/src/emqtt_lib.erl new file mode 100644 index 000000000..44d8566ae --- /dev/null +++ b/src/emqtt_lib.erl @@ -0,0 +1,2 @@ +-module(emqtt_lib). + diff --git a/src/emqtt_reader.erl b/src/emqtt_reader.erl new file mode 100644 index 000000000..a91f0dc82 --- /dev/null +++ b/src/emqtt_reader.erl @@ -0,0 +1,3 @@ +%tcp data reader +-module(emqtt_reader). + diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl new file mode 100644 index 000000000..57a836da5 --- /dev/null +++ b/src/emqtt_router.erl @@ -0,0 +1,39 @@ +-module(emqtt_router). + +-include("emqtt.hrl"). + +-export([start_link/0]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state,{}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #state{}}. + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, _State, _Extra) -> + ok. + + diff --git a/src/emqtt_subscriber.erl b/src/emqtt_subscriber.erl new file mode 100644 index 000000000..5dfb74414 --- /dev/null +++ b/src/emqtt_subscriber.erl @@ -0,0 +1,42 @@ +-module(emqtt_subscriber). + +-include("emqtt.hrl"). + +-export([start_link/0]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state,{}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(subscriber, [bag, protected, {keypos, 2}]), + {ok, #state{}}. + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, _State, _Extra) -> + ok. + + + + diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl new file mode 100644 index 000000000..c00618cda --- /dev/null +++ b/src/emqtt_sup.erl @@ -0,0 +1,32 @@ +-module(emqtt_sup). + +-include("emqtt.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_all, 5, 10}, [ + ?CHILD(emqtt_topic, worker), + ?CHILD(emqtt_router, worker) + ]} }. + diff --git a/src/emqtt_topic.erl b/src/emqtt_topic.erl new file mode 100644 index 000000000..9cdbedecd --- /dev/null +++ b/src/emqtt_topic.erl @@ -0,0 +1,115 @@ + + +-module(emqtt_topic). + +-include("emqtt.hrl"). + +-export([start_link/0, + match/1, + insert/1, + delete/1]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +match(Topic) when is_binary(Topic) -> + DirectMatches = mnesia:dirty_read(direct_topic, Topic), + Words = topic_split(Topic), + WildcardMatches = lists:append([ + mnesia:dirty_read(wildcard_topic, Key) || + Key <- mnesia:dirty_all_keys(wildcard_topic), topic_match(Words, Key) + ]), + DirectMatches ++ WildcardMatches. + + +insert(Topic) when is_binary(Topic) -> + gen_server:call(?MODULE, {insert, Topic}). + +delete(Topic) when is_binary(Topic) -> + gen_server:cast(?MODULE, {delete, Topic}). + +init([]) -> + {atomic, ok} = mnesia:create_table( + direct_topic, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, direct_topic)}]), + {atomic, ok} = mnesia:create_table( + wildcard_topic, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, wildcard_topic)}]), + {ok, #state{}}. + +handle_call({insert, Topic}, _From, State) -> + Words = topic_split(Topic), + Reply = + case topic_type(Words) of + direct -> + mnesia:dirty_write(#direct_topic{name=Topic}); + wildcard -> + mnesia:dirty_write(#wildcard_topic{words=Words}) + end, + {reply, Reply, State}; + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast({delete, Topic}, State) -> + Words = topic_split(Topic), + case topic_type(Words) of + direct -> + mnesia:dirty_delete(direct_topic, Topic); + wildcard -> + mnesia:direct_delete(wildcard_topic, Words) + end, + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, _State, _Extra) -> + ok. + +topic_type([]) -> + direct; +topic_type([<<"#">>]) -> + wildcard; +topic_type([<<"+">>|_T]) -> + wildcard; +topic_type([_|T]) -> + topic_type(T). + +topic_match([], []) -> + true; + +topic_match([H|T1], [H|T2]) -> + topic_match(T1, T2); + +topic_match([_H|T1], [<<"+">>|T2]) -> + topic_match(T1, T2); + +topic_match(_, [<<"#">>]) -> + true; + +topic_match([], [_H|_T2]) -> + false. + +topic_split(S) -> + binary:split(S, [<<"/">>], [global]). +