diff --git a/apps/emqtt/src/emqtt_app.erl b/apps/emqtt/src/emqtt_app.erl index 3b9aeff9d..14c3bf29e 100644 --- a/apps/emqtt/src/emqtt_app.erl +++ b/apps/emqtt/src/emqtt_app.erl @@ -77,6 +77,14 @@ start_servers(Sup) -> {"emqtt auth", emqtt_auth}, {"emqtt retained", emqtt_retained}, {"emqtt pubsub", emqtt_pubsub}, + {"emqtt router", emqtt_router}, + {"emqtt queue supervisor", fun() -> + Mod = emqtt_queue_sup, + supervisor:start_child(Sup, + {Mod, + {Mod, start_link, []}, + permanent, 1000, supervisor, [Mod]}) + end}, {"emqtt monitor", emqtt_monitor} ]). diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index da692bbc2..e3e18d1b6 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -61,11 +61,12 @@ go(Pid, Sock) -> gen_server:call(Pid, {go, Sock}). init([Sock]) -> + io:format("client is created: ~p~n", [self()]), {ok, #conn_state{socket = Sock}, hibernate}. handle_call({go, Sock}, _From, State = #conn_state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), - lager:debug("conn from ~s", [ConnStr]), + io:format("conn from ~s~n", [ConnStr]), {reply, ok, control_throttle( #conn_state{ socket = Sock, @@ -126,7 +127,8 @@ handle_info(Info, State) -> lager:error("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. -terminate(_Reason, #conn_state{proto_state = unefined}) -> +terminate(Reason, #conn_state{proto_state = unefined}) -> + io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]), %%TODO: fix keep_alive... %%emqtt_keep_alive:cancel(KeepAlive), %emqtt_protocol:client_terminated(ProtoState), diff --git a/apps/emqtt/src/emqtt_queue.erl b/apps/emqtt/src/emqtt_queue.erl new file mode 100644 index 000000000..da715a542 --- /dev/null +++ b/apps/emqtt/src/emqtt_queue.erl @@ -0,0 +1,73 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ +-module(emqtt_queue). + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +-export([start_link/0]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(Args) -> + {ok, Args}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + diff --git a/apps/emqtt/src/emqtt_queue_sup.erl b/apps/emqtt/src/emqtt_queue_sup.erl new file mode 100644 index 000000000..3cfa25383 --- /dev/null +++ b/apps/emqtt/src/emqtt_queue_sup.erl @@ -0,0 +1,41 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ +-module(emqtt_queue_sup). + +-author('feng@slimchat.io'). + +-behavior(supervisor). + +-export([start_link/0, start_queue/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_queue() -> + supervisor:start_child(?MODULE, []). + +init([]) -> + {ok, {{simple_one_for_one, 0, 1}, + [{queue, {emqtt_queue, start_link, []}, + transient, 10000, worker, [emqtt_queue]}]}}. + + diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index 6b5c48c9b..43cc16268 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -27,18 +27,65 @@ -include("emqtt_frame.hrl"). --export([route/1]). +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +-export([start_link/0]). %%Router Chain--> %%--->In %%Out<--- +-export([route/1]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec route(Msg :: mqtt_msg()) -> any(). route(Msg) -> emqtt_pubsub:publish(retained(Msg)). +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(Args) -> + {ok, Args}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ retained(Msg = #mqtt_msg{retain = true, topic = Topic}) -> emqtt_retained:insert(Topic, Msg), Msg; retained(Msg) -> Msg. - diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl new file mode 100644 index 000000000..cbcb477d9 --- /dev/null +++ b/apps/emqtt/src/emqtt_sm.erl @@ -0,0 +1,5 @@ +-module(emqtt_sm). + +%%emqtt session manager... + +%%cleanSess: true | false diff --git a/doc/design.md b/doc/design.md new file mode 100644 index 000000000..9e775a095 --- /dev/null +++ b/doc/design.md @@ -0,0 +1,30 @@ +# eMQTT Desgin Guide + +## KeepAlive + +## Retained + +## QOS1 + +## QOS2 + +## Durable Subscriptions + +Durable Sub: + +Client->Queue->Router->Queue->Client + +Normal Sub: + +Client->Router->Client + +Router to register queues + +## Topic Tree + +## Cluster + +## Bridge + +## Offline Message +