diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..b191b86db --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +ebin/* diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..e577ac1ca --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +all: compile + +run: compile + erl -pa ebin -config etc/emqtt.config -s emqtt_app start + +compile: + rebar compile diff --git a/TODO b/TODO new file mode 100644 index 000000000..851d9ac39 --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +1. Topic Trie diff --git a/docs/cluster.md b/docs/cluster.md new file mode 100644 index 000000000..281b1e700 --- /dev/null +++ b/docs/cluster.md @@ -0,0 +1,12 @@ + +Cluster Architecture +==================== + + +Topic: Memory Copy + +Topic ----------- Topic + +Subscriber: Local Node + + diff --git a/docs/desgin.md b/docs/desgin.md new file mode 100644 index 000000000..c3c98819c --- /dev/null +++ b/docs/desgin.md @@ -0,0 +1,13 @@ +Erlang TCP +========= + + +One Million TCP Connections +========================== + +http://news.ycombinator.com/item?id=3028547 + +http://www.kegel.com/c10k.html + +http://20bits.com/article/erlang-a-generalized-tcp-server + diff --git a/docs/topic.md b/docs/topic.md new file mode 100644 index 000000000..91755dec5 --- /dev/null +++ b/docs/topic.md @@ -0,0 +1,15 @@ + +Direct Topic + +or + +Wildchar Topic? + + +a/+/b +a/# +# + +Trie Data Structure + + diff --git a/etc/emqtt.config b/etc/emqtt.config new file mode 100644 index 000000000..0d974db0e --- /dev/null +++ b/etc/emqtt.config @@ -0,0 +1,15 @@ +%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- +%% ex: ft=erlang ts=4 sw=4 et +[{kernel, + [{start_timer, true}, + {start_pg2, true} + ]}, + {sasl, [ + {sasl_error_logger, {file, "log/emqtt_sasl.log"}} + ]}, + {mnesia, [ + {dir, "var/mnesia"} + ]} +]. + + diff --git a/include/emqtt.hrl b/include/emqtt.hrl new file mode 100644 index 000000000..fc5f70e56 --- /dev/null +++ b/include/emqtt.hrl @@ -0,0 +1,10 @@ + + +-record(direct_topic, {name, node}). + +-record(wildcard_topic, {words, node}). + +-record(subscriber, {topic, pid}). + + + diff --git a/log/.placeholder b/log/.placeholder new file mode 100644 index 000000000..e69de29bb diff --git a/src/.emqtt_topic.erl.swp b/src/.emqtt_topic.erl.swp new file mode 100644 index 000000000..a5e73750e Binary files /dev/null and b/src/.emqtt_topic.erl.swp differ diff --git a/src/emqtt.app.src b/src/emqtt.app.src new file mode 100644 index 000000000..63d024b5e --- /dev/null +++ b/src/emqtt.app.src @@ -0,0 +1,12 @@ +{application, emqtt, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqtt_app, []}}, + {env, []} + ]}. diff --git a/src/emqtt_app.erl b/src/emqtt_app.erl new file mode 100644 index 000000000..efd7cc6dd --- /dev/null +++ b/src/emqtt_app.erl @@ -0,0 +1,32 @@ +-module(emqtt_app). + +-export([start/0]). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +-define(APPS, [sasl, mnesia, emqtt]). + +start() -> + [start_app(App) || App <- ?APPS]. + +start_app(mnesia) -> + mnesia:create_schema([node()]), + mnesia:start(); + +start_app(App) -> + application:start(App). + + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + emqtt_sup:start_link(). + +stop(_State) -> + ok. + diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl new file mode 100644 index 000000000..bd83b5f7b --- /dev/null +++ b/src/emqtt_client.erl @@ -0,0 +1,5 @@ +%simulate a mqtt connection +-module(emqtt_client). + + + diff --git a/src/emqtt_client_sup.erl b/src/emqtt_client_sup.erl new file mode 100644 index 000000000..ce0f47441 --- /dev/null +++ b/src/emqtt_client_sup.erl @@ -0,0 +1 @@ +-module(emqtt_client_sup). diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl new file mode 100644 index 000000000..102310f9a --- /dev/null +++ b/src/emqtt_frame.erl @@ -0,0 +1 @@ +-module(emqtt_frame). diff --git a/src/emqtt_lib.erl b/src/emqtt_lib.erl new file mode 100644 index 000000000..44d8566ae --- /dev/null +++ b/src/emqtt_lib.erl @@ -0,0 +1,2 @@ +-module(emqtt_lib). + diff --git a/src/emqtt_reader.erl b/src/emqtt_reader.erl new file mode 100644 index 000000000..a91f0dc82 --- /dev/null +++ b/src/emqtt_reader.erl @@ -0,0 +1,3 @@ +%tcp data reader +-module(emqtt_reader). + diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl new file mode 100644 index 000000000..57a836da5 --- /dev/null +++ b/src/emqtt_router.erl @@ -0,0 +1,39 @@ +-module(emqtt_router). + +-include("emqtt.hrl"). + +-export([start_link/0]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state,{}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #state{}}. + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, _State, _Extra) -> + ok. + + diff --git a/src/emqtt_subscriber.erl b/src/emqtt_subscriber.erl new file mode 100644 index 000000000..5dfb74414 --- /dev/null +++ b/src/emqtt_subscriber.erl @@ -0,0 +1,42 @@ +-module(emqtt_subscriber). + +-include("emqtt.hrl"). + +-export([start_link/0]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state,{}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(subscriber, [bag, protected, {keypos, 2}]), + {ok, #state{}}. + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, _State, _Extra) -> + ok. + + + + diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl new file mode 100644 index 000000000..c00618cda --- /dev/null +++ b/src/emqtt_sup.erl @@ -0,0 +1,32 @@ +-module(emqtt_sup). + +-include("emqtt.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_all, 5, 10}, [ + ?CHILD(emqtt_topic, worker), + ?CHILD(emqtt_router, worker) + ]} }. + diff --git a/src/emqtt_topic.erl b/src/emqtt_topic.erl new file mode 100644 index 000000000..9cdbedecd --- /dev/null +++ b/src/emqtt_topic.erl @@ -0,0 +1,115 @@ + + +-module(emqtt_topic). + +-include("emqtt.hrl"). + +-export([start_link/0, + match/1, + insert/1, + delete/1]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +match(Topic) when is_binary(Topic) -> + DirectMatches = mnesia:dirty_read(direct_topic, Topic), + Words = topic_split(Topic), + WildcardMatches = lists:append([ + mnesia:dirty_read(wildcard_topic, Key) || + Key <- mnesia:dirty_all_keys(wildcard_topic), topic_match(Words, Key) + ]), + DirectMatches ++ WildcardMatches. + + +insert(Topic) when is_binary(Topic) -> + gen_server:call(?MODULE, {insert, Topic}). + +delete(Topic) when is_binary(Topic) -> + gen_server:cast(?MODULE, {delete, Topic}). + +init([]) -> + {atomic, ok} = mnesia:create_table( + direct_topic, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, direct_topic)}]), + {atomic, ok} = mnesia:create_table( + wildcard_topic, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, wildcard_topic)}]), + {ok, #state{}}. + +handle_call({insert, Topic}, _From, State) -> + Words = topic_split(Topic), + Reply = + case topic_type(Words) of + direct -> + mnesia:dirty_write(#direct_topic{name=Topic}); + wildcard -> + mnesia:dirty_write(#wildcard_topic{words=Words}) + end, + {reply, Reply, State}; + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast({delete, Topic}, State) -> + Words = topic_split(Topic), + case topic_type(Words) of + direct -> + mnesia:dirty_delete(direct_topic, Topic); + wildcard -> + mnesia:direct_delete(wildcard_topic, Words) + end, + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, _State, _Extra) -> + ok. + +topic_type([]) -> + direct; +topic_type([<<"#">>]) -> + wildcard; +topic_type([<<"+">>|_T]) -> + wildcard; +topic_type([_|T]) -> + topic_type(T). + +topic_match([], []) -> + true; + +topic_match([H|T1], [H|T2]) -> + topic_match(T1, T2); + +topic_match([_H|T1], [<<"+">>|T2]) -> + topic_match(T1, T2); + +topic_match(_, [<<"#">>]) -> + true; + +topic_match([], [_H|_T2]) -> + false. + +topic_split(S) -> + binary:split(S, [<<"/">>], [global]). +