new pubsub

This commit is contained in:
Feng 2015-12-04 17:36:09 +08:00
parent d6cf52fc70
commit 812779004f
11 changed files with 230 additions and 138 deletions

View File

@ -147,17 +147,17 @@
%% Default should be scheduler numbers
%% {pool_size, 8},
%% Subscription: disc | ram
%% Subscription: disc | ram | false
{subscription, ram},
%% Route shard
{route_shard, true},
{route_shard, false},
%% Route delay, false | integer
{route_delay, false},
%% Route aging time(seconds)
{route_aging, 10}
{route_aging, 5}
]},
%% Bridge

View File

@ -139,17 +139,17 @@
%% Default should be scheduler numbers
%% {pool_size, 8},
%% Subscription: disc | ram
%% Subscription: disc | ram | false
{subscription, ram},
%% Route shard
{route_shard, true},
{route_shard, false},
%% Route delay, false | integer
{route_delay, false},
%% Route aging time(seconds)
{route_aging, 10}
{route_aging, 5}
]},
%% Bridge

View File

@ -26,9 +26,9 @@
-module(emqttd).
-export([start/0, env/1, env/2,
open_listeners/1, close_listeners/1,
start_listeners/0, stop_listeners/0,
load_all_mods/0, is_mod_enabled/1,
is_running/1, ensure_pool/3]).
is_running/1]).
-define(MQTT_SOCKOPTS, [
binary,
@ -38,6 +38,8 @@
{nodelay, true}
]).
-define(APP, ?MODULE).
-type listener() :: {atom(), inet:port_number(), [esockd:option()]}.
%%------------------------------------------------------------------------------
@ -61,32 +63,34 @@ env(Group, Name) ->
proplists:get_value(Name, env(Group)).
%%------------------------------------------------------------------------------
%% @doc Open Listeners
%% @doc Start Listeners
%% @end
%%------------------------------------------------------------------------------
-spec open_listeners([listener()]) -> any().
open_listeners(Listeners) when is_list(Listeners) ->
[open_listener(Listener) || Listener <- Listeners].
-spec start_listeners() -> any().
start_listeners() ->
{ok, Listeners} = application:get_env(?APP, listeners),
lists:foreach(fun start_listener/1, Listeners).
%% open mqtt port
open_listener({mqtt, Port, Options}) ->
open_listener(mqtt, Port, Options);
%% Start mqtt listener
-spec start_listener(listener()) -> any().
start_listener({mqtt, Port, Options}) ->
start_listener(mqtt, Port, Options);
%% open mqtt(SSL) port
open_listener({mqtts, Port, Options}) ->
open_listener(mqtts, Port, Options);
%% Start mqtt(SSL) listener
start_listener({mqtts, Port, Options}) ->
start_listener(mqtts, Port, Options);
%% open http port
open_listener({http, Port, Options}) ->
%% Start http listener
start_listener({http, Port, Options}) ->
MFArgs = {emqttd_http, handle_request, []},
mochiweb:start_http(Port, Options, MFArgs);
%% open https port
open_listener({https, Port, Options}) ->
%% Start https listener
start_listener({https, Port, Options}) ->
MFArgs = {emqttd_http, handle_request, []},
mochiweb:start_http(Port, Options, MFArgs).
open_listener(Protocol, Port, Options) ->
start_listener(Protocol, Port, Options) ->
MFArgs = {emqttd_client, start_link, [env(mqtt)]},
esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
@ -96,14 +100,14 @@ merge_sockopts(Options) ->
emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
%%------------------------------------------------------------------------------
%% @doc Close Listeners
%% @doc Stop Listeners
%% @end
%%------------------------------------------------------------------------------
-spec close_listeners([listener()]) -> any().
close_listeners(Listeners) when is_list(Listeners) ->
[close_listener(Listener) || Listener <- Listeners].
stop_listeners() ->
{ok, Listeners} = application:get_env(?APP, listeners),
lists:foreach(fun stop_listener/1, Listeners).
close_listener({Protocol, Port, _Options}) ->
stop_listener({Protocol, Port, _Options}) ->
esockd:close({Protocol, Port}).
load_all_mods() ->
@ -127,13 +131,3 @@ is_running(Node) ->
Pid when is_pid(Pid) -> true
end.
%%------------------------------------------------------------------------------
%% @doc Ensure gproc pool exist.
%% @end
%%------------------------------------------------------------------------------
ensure_pool(Pool, Type, Opts) ->
try gproc_pool:new(Pool, Type, Opts)
catch
error:exists -> ok
end.

View File

@ -49,7 +49,7 @@ start(_StartType, _StartArgs) ->
emqttd_cli:load(),
emqttd:load_all_mods(),
emqttd_plugins:load(),
start_listeners(),
emqttd:start_listeners(),
register(emqttd, self()),
print_vsn(),
{ok, Sup}.
@ -62,10 +62,6 @@ print_vsn() ->
{ok, Desc} = application:get_key(description),
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_listeners() ->
{ok, Listeners} = application:get_env(listeners),
emqttd:open_listeners(Listeners).
start_servers(Sup) ->
Servers = [{"emqttd ctl", emqttd_ctl},
{"emqttd trace", emqttd_trace},
@ -132,15 +128,5 @@ worker_spec(M, F, A) ->
-spec stop(State :: term()) -> term().
stop(_State) ->
stop_listeners().
stop_listeners() ->
%% ensure that esockd applications is started?
case lists:keyfind(esockd, 1, application:which_applications()) of
false ->
ignore;
_Tuple ->
{ok, Listeners} = application:get_env(listeners),
emqttd:close_listeners(Listeners)
end.
catch emqttd:stop_listeners().

View File

@ -82,7 +82,7 @@ init([Node, SubTopic, Options]) ->
MQueue = emqttd_mqueue:new(qname(Node, SubTopic),
[{max_len, State#state.max_queue_len}],
emqttd_alarm:alarm_fun()),
emqttd_pubsub:subscribe(SubTopic, State#state.qos),
emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
{ok, State#state{mqueue = MQueue}};
false ->
{stop, {cannot_connect, Node}}

View File

@ -51,11 +51,23 @@ sup_name(Pool) ->
list_to_atom(atom_to_list(Pool) ++ "_pool_sup").
init([Pool, Type, Size, {M, F, Args}]) ->
emqttd:ensure_pool(Pool, Type, [{size, Size}]),
ensure_pool(Pool, Type, [{size, Size}]),
{ok, {{one_for_one, 10, 3600}, [
begin
gproc_pool:add_worker(Pool, {Pool, I}, I),
ensure_pool_worker(Pool, {Pool, I}, I),
{{M, I}, {M, F, [Pool, I | Args]},
transient, 5000, worker, [M]}
end || I <- lists:seq(1, Size)]}}.
ensure_pool(Pool, Type, Opts) ->
try gproc_pool:new(Pool, Type, Opts)
catch
error:exists -> ok
end.
ensure_pool_worker(Pool, Name, Slot) ->
try gproc_pool:add_worker(Pool, Name, Slot)
catch
error:exists -> ok
end.

View File

@ -67,18 +67,16 @@
%%%=============================================================================
mnesia(boot) ->
ok = create_table(topic, ram_copies),
case env(subscription) of
disc -> ok = create_table(subscription, disc_copies);
ram -> ok = create_table(subscription, ram_copies);
false -> ok
end;
if_subscription(fun(RamOrDisc) ->
ok = create_table(subscription, RamOrDisc)
end);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(topic),
case env(subscription) of
false -> ok;
_ -> ok = emqttd_mnesia:copy_table(subscription)
end.
%% Only one disc_copy???
if_subscription(fun(_RamOrDisc) ->
ok = emqttd_mnesia:copy_table(subscription)
end).
%% Topic Table
create_table(topic, RamOrDisc) ->
@ -96,16 +94,27 @@ create_table(subscription, RamOrDisc) ->
{record_name, mqtt_subscription},
{attributes, record_info(fields, mqtt_subscription)}]).
if_subscription(Fun) ->
case env(subscription) of
disc -> Fun(disc_copies);
ram -> Fun(ram_copies);
false -> ok;
undefined -> ok
end.
env(Key) ->
case get({pubsub, Key}) of
undefined ->
Val = proplists:get_value(Key, emqttd_broker:env(pubsub)),
put({pubsub, Key}, Val),
Val;
cache_env(Key);
Val ->
Val
end.
cache_env(Key) ->
Val = emqttd_opts:g(Key, emqttd_broker:env(pubsub)),
put({pubsub, Key}, Val),
Val.
%%%=============================================================================
%%% API
%%%=============================================================================
@ -217,7 +226,8 @@ publish(Topic, Msg) when is_binary(Topic) ->
-spec match(Topic :: binary()) -> [mqtt_topic()].
match(Topic) when is_binary(Topic) ->
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
lists:append([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
%% ets:lookup for topic table will be copied.
lists:append([ets:lookup(topic, Name) || Name <- MatchedTopics]).
%%%=============================================================================
%%% gen_server callbacks
@ -226,25 +236,23 @@ match(Topic) when is_binary(Topic) ->
init([Pool, Id, Opts]) ->
?ROUTER:init(Opts),
?GPROC_POOL(join, Pool, Id),
process_flag(priority, high),
{ok, #state{pool = Pool, id = Id}}.
handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) ->
%% Clean aging topics
?HELPER:clean([Topic || {Topic, _Qos} <- TopicTable]),
%% Add routes first
?ROUTER:add_routes(TopicTable, SubPid),
%% Add topics
Node = node(),
TRecords = [#mqtt_topic{topic = Topic, node = Node} || {Topic, _Qos} <- TopicTable],
Topics = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable],
%% Add subscriptions
case mnesia:transaction(fun add_topics/1, [TRecords]) of
case mnesia:transaction(fun add_topics/1, [Topics]) of
{atomic, _} ->
%%TODO: store subscription
%% mnesia:async_dirty(fun add_subscriptions/2, [SubId, TopicTable]),
if_subscription(
fun(_) ->
%% Add subscriptions
Args = [fun add_subscriptions/2, [SubId, TopicTable]],
emqttd_pooler:async_submit({mnesia, async_dirty, Args})
end),
{reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
{aborted, Error} ->
{reply, {error, Error}, State}
@ -257,10 +265,12 @@ handle_call(Req, _From, State) ->
handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State) ->
%% Delete routes first
?ROUTER:delete_routes(Topics, SubPid),
%% Remove subscriptions
mnesia:async_dirty(fun remove_subscriptions/2, [SubId, Topics]),
if_subscription(
fun(_) ->
Args = [fun remove_subscriptions/2, [SubId, Topics]],
emqttd_pooler:async_submit({mnesia, async_dirty, Args})
end),
{noreply, State};
handle_cast(Msg, State) ->
@ -268,7 +278,13 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
Routes = ?ROUTER:lookup_routes(DownPid),
%% Delete all routes of the process
?ROUTER:delete_routes(DownPid),
?HELPER:aging([Topic || {Topic, _Qos} <- Routes, not ?ROUTER:has_route(Topic)]),
{noreply, State, hibernate};
handle_info(Info, State) ->

View File

@ -19,100 +19,177 @@
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc PubSub Helper
%%% @doc PubSub Route Aging Helper
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(emqttd_pubsub_helper).
-behaviour(gen_server).
-behaviour(gen_server2).
-include("emqttd.hrl").
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/1, clean/1, setstats/1]).
-export([start_link/1, aging/1, setstats/1]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(aging, {topics, timer}).
-ifdef(TEST).
-compile(export_all).
-endif.
-record(aging, {topics, time, tref}).
-record(state, {aging :: #aging{}}).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
-define(SERVER, ?MODULE).
-define(ROUTER, emqttd_router).
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Start pubsub helper.
%% @end
%%------------------------------------------------------------------------------
-spec start_link(list(tuple())) -> {ok, pid()} | ignore | {error, any()}.
start_link(Opts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
gen_server2:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
clean(Topics) ->
ok.
%%------------------------------------------------------------------------------
%% @doc Aging topics
%% @end
%%------------------------------------------------------------------------------
-spec aging(list(binary())) -> ok.
aging(Topics) ->
gen_server2:cast(?SERVER, {aging, Topics}).
setstats(topic) ->
Size = mnesia:table_info(topic, size),
emqttd_stats:setstats('topics/count', 'topics/max', Size);
emqttd_stats:setstats('topics/count', 'topics/max',
mnesia:table_info(topic, size));
setstats(subscription) ->
ok.
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
mnesia:table_info(subscription, size)).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Opts]) ->
mnesia:subscribe(system),
AgingSecs = proplists:get_value(route_aging, Opts, 5),
%% Aging Timer
AgingSecs = proplists:get_value(aging, Opts, 5),
{ok, AgingTref} = start_tick(AgingSecs div 2),
{ok, TRef} = timer:send_interval(timer:seconds(AgingSecs), aging),
{ok, #state{aging = #aging{topics = dict:new(),
time = AgingSecs,
tref = AgingTref}}}.
{ok, #state{aging = #aging{topics = [], timer = TRef}}}.
start_tick(Secs) ->
timer:send_interval(timer:seconds(Secs), {clean, aged}).
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({aging, Topics}, State = #state{aging = Aging}) ->
#aging{topics = Dict} = Aging,
TS = emqttd_util:now_to_secs(),
Dict1 =
lists:foldl(fun(Topic, Acc) ->
case dict:find(Topic, Acc) of
{ok, _} -> Acc;
error -> dict:store(Topic, TS, Acc)
end
end, Dict, Topics),
{noreply, State#state{aging = Aging#aging{topics = Dict1}}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({clean, aged}, State = #state{aging = Aging}) ->
#aging{topics = Dict, time = Time} = Aging,
ByTime = emqttd_util:now_to_secs() - Time,
Dict1 = try_clean(ByTime, dict:to_list(Dict)),
NewAging = Aging#aging{topics = dict:from_list(Dict1)},
{noreply, State#state{aging = NewAging}, hibernate};
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
Pattern = #mqtt_topic{_ = '_', node = Node},
F = fun() ->
[mnesia:delete_object(topic, R, write) ||
R <- mnesia:match_object(topic, Pattern, write)]
end,
mnesia:async_dirty(F),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{aging = #aging{timer = TRef}}) ->
timer:cancel(TRef),
TopicR = #mqtt_topic{_ = '_', node = node()},
F = fun() ->
[mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)]
%%TODO: remove trie??
end,
mnesia:transaction(F),
ok.
terminate(_Reason, #state{aging = #aging{tref = TRef}}) ->
timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
%%%=============================================================================
%%% Internal Functions
%%%=============================================================================
try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
case mnesia:read({subscriber, Topic}) of
[] ->
mnesia:delete_object(topic, TopicR, write),
case mnesia:read(topic, Topic) of
[] -> emqttd_trie:delete(Topic);
_ -> ok
end;
_ ->
ok
try_clean(ByTime, List) ->
try_clean(ByTime, List, []).
try_clean(_ByTime, [], Acc) ->
Acc;
try_clean(ByTime, [{Topic, TS} | Left], Acc) ->
case ?ROUTER:has_route(Topic) of
false ->
try_clean2(ByTime, {Topic, TS}, Left, Acc);
true ->
try_clean(ByTime, Left, Acc)
end.
%%%=============================================================================
%%% Stats functions
%%%=============================================================================
try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime ->
try_clean(ByTime, Left, [{Topic, TS}|Acc]);
try_clean2(ByTime, {Topic, _TS}, Left, Acc) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
io:format("Try to remove topic: ~p~n", [Topic]),
mnesia:transaction(fun try_remove_topic/1, [TopicR]),
try_clean(ByTime, Left, Acc).
try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
%% Lock topic first
case mnesia:wread({topic, Topic}) of
[] -> ok;
[TopicR] ->
if_no_route(Topic, fun() ->
%% Remove topic and trie
mnesia:delete_object(topic, TopicR, write),
emqttd_trie:delete(Topic)
end);
_More ->
if_no_route(Topic, fun() ->
%% Remove topic
mnesia:delete_object(topic, TopicR, write)
end)
end.
if_no_route(Topic, Fun) ->
case ?ROUTER:has_route(Topic) of
true -> ok;
false -> Fun()
end.

View File

@ -39,7 +39,7 @@
-include("emqttd_protocol.hrl").
-export([init/1, route/2, lookup_routes/1,
-export([init/1, route/2, lookup_routes/1, has_route/1,
add_routes/2, delete_routes/1, delete_routes/2]).
-ifdef(TEST).
@ -92,6 +92,14 @@ add_routes(TopicTable, Pid) when is_pid(Pid) ->
lookup_routes(Pid) when is_pid(Pid) ->
[{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)].
%%------------------------------------------------------------------------------
%% @doc Has Route
%% @end
%%------------------------------------------------------------------------------
-spec has_route(binary()) -> boolean().
has_route(Topic) ->
ets:member(route, Topic).
%%------------------------------------------------------------------------------
%% @doc Delete Routes.
%% @end

View File

@ -320,7 +320,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
hibernate(Session);
_ ->
%% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
{ok, GrantedQos} = emqttd_pubsub:subscribe(ClientId, TopicTable),
AckFun(GrantedQos),

View File

@ -51,7 +51,6 @@ start_link(StatsFun) ->
init([StatsFun]) ->
mnesia:subscribe(system),
{ok, TRef} = timer:send_interval(timer:seconds(1), tick),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
{ok, #state{stats_fun = StatsFun, tick_tref = TRef}}.
handle_call(_Request, _From, State) ->