queue, router, sm

This commit is contained in:
Ery Lee 2015-01-06 21:46:12 +08:00
parent f1c7185f52
commit b44511af50
7 changed files with 210 additions and 4 deletions

View File

@ -77,6 +77,14 @@ start_servers(Sup) ->
{"emqtt auth", emqtt_auth},
{"emqtt retained", emqtt_retained},
{"emqtt pubsub", emqtt_pubsub},
{"emqtt router", emqtt_router},
{"emqtt queue supervisor", fun() ->
Mod = emqtt_queue_sup,
supervisor:start_child(Sup,
{Mod,
{Mod, start_link, []},
permanent, 1000, supervisor, [Mod]})
end},
{"emqtt monitor", emqtt_monitor}
]).

View File

@ -61,11 +61,12 @@ go(Pid, Sock) ->
gen_server:call(Pid, {go, Sock}).
init([Sock]) ->
io:format("client is created: ~p~n", [self()]),
{ok, #conn_state{socket = Sock}, hibernate}.
handle_call({go, Sock}, _From, State = #conn_state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
lager:debug("conn from ~s", [ConnStr]),
io:format("conn from ~s~n", [ConnStr]),
{reply, ok,
control_throttle(
#conn_state{ socket = Sock,
@ -126,7 +127,8 @@ handle_info(Info, State) ->
lager:error("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}.
terminate(_Reason, #conn_state{proto_state = unefined}) ->
terminate(Reason, #conn_state{proto_state = unefined}) ->
io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]),
%%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive),
%emqtt_protocol:client_terminated(ProtoState),

View File

@ -0,0 +1,73 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_queue).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link(?MODULE, [], []).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Args) ->
{ok, Args}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

View File

@ -0,0 +1,41 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_queue_sup).
-author('feng@slimchat.io').
-behavior(supervisor).
-export([start_link/0, start_queue/0, init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_queue() ->
supervisor:start_child(?MODULE, []).
init([]) ->
{ok, {{simple_one_for_one, 0, 1},
[{queue, {emqtt_queue, start_link, []},
transient, 10000, worker, [emqtt_queue]}]}}.

View File

@ -27,18 +27,65 @@
-include("emqtt_frame.hrl").
-export([route/1]).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
%%Router Chain-->
%%--->In
%%Out<---
-export([route/1]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec route(Msg :: mqtt_msg()) -> any().
route(Msg) ->
emqtt_pubsub:publish(retained(Msg)).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Args) ->
{ok, Args}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
retained(Msg = #mqtt_msg{retain = true, topic = Topic}) ->
emqtt_retained:insert(Topic, Msg), Msg;
retained(Msg) -> Msg.

View File

@ -0,0 +1,5 @@
-module(emqtt_sm).
%%emqtt session manager...
%%cleanSess: true | false

30
doc/design.md Normal file
View File

@ -0,0 +1,30 @@
# eMQTT Desgin Guide
## KeepAlive
## Retained
## QOS1
## QOS2
## Durable Subscriptions
Durable Sub:
Client->Queue->Router->Queue->Client
Normal Sub:
Client->Router->Client
Router to register queues
## Topic Tree
## Cluster
## Bridge
## Offline Message