From 855152f653cc1a9de2f9627c30ecfefb40238030 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 5 Dec 2015 02:18:06 +0800 Subject: [PATCH] spec/1 --- src/emqttd_client.erl | 4 +- src/emqttd_cm_sup.erl | 2 +- src/emqttd_http.erl | 4 +- src/emqttd_mnesia.erl | 2 +- src/emqttd_opts.erl | 8 ++-- src/emqttd_pool_sup.erl | 12 ++++-- src/emqttd_pooler.erl | 9 +++-- src/emqttd_pubsub.erl | 78 ++++++++++++++++++++++++------------ src/emqttd_pubsub_helper.erl | 37 ++++++++--------- src/emqttd_pubsub_sup.erl | 14 +++++-- src/emqttd_sm_sup.erl | 3 +- 11 files changed, 103 insertions(+), 70 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index f7e8aa85e..216d285e7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -36,7 +36,7 @@ %% API Function Exports -export([start_link/2, session/1, info/1, kick/1]). -%% SUB/UNSUB Asynchronously, called by plugins. +%% SUB/UNSUB Asynchronously. Called by plugins. -export([subscribe/2, unsubscribe/2]). %% gen_server Function Exports @@ -243,7 +243,7 @@ with_session(Fun, State = #client_state{proto_state = ProtoState}) -> Fun(emqttd_protocol:session(ProtoState)), hibernate(State). -%% receive and parse tcp data +%% Receive and parse tcp data received(<<>>, State) -> hibernate(State); diff --git a/src/emqttd_cm_sup.erl b/src/emqttd_cm_sup.erl index 311513cf9..6640c7cdd 100644 --- a/src/emqttd_cm_sup.erl +++ b/src/emqttd_cm_sup.erl @@ -48,7 +48,7 @@ init([]) -> %% CM Pool Sup MFA = {?CM, start_link, [emqttd_stats:statsfun('clients/count', 'clients/max')]}, - PoolSup = emqttd_pool_sup:spec(pool_sup, [?CM, hash, erlang:system_info(schedulers), MFA]), + PoolSup = emqttd_pool_sup:spec([?CM, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [PoolSup]}}. diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 87d5e7b67..22f5e5f8c 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -41,7 +41,7 @@ handle_request('GET', "/status", Req) -> AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of false -> not_running; - {value, _Ver} -> running + {value, _Val} -> running end, Status = io_lib:format("Node ~s is ~s~nemqttd is ~s", [node(), InternalStatus, AppStatus]), @@ -78,7 +78,7 @@ handle_request('POST', "/mqtt/publish", Req) -> %% MQTT Over WebSocket %%------------------------------------------------------------------------------ handle_request('GET', "/mqtt", Req) -> - lager:info("Websocket Connection from: ~s", [Req:get(peer)]), + lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), Proto = Req:get_header_value("Sec-WebSocket-Protocol"), case {is_websocket(Upgrade), Proto} of diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 4d3ca92e4..06e2bea7e 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -35,7 +35,7 @@ start() -> case init_schema() of - ok -> + ok -> ok; {error, {_Node, {already_exists, _Node}}} -> ok; diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index ec324b713..147791cdd 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -35,14 +35,12 @@ merge(Defaults, Options) -> lists:foldl( fun({Opt, Val}, Acc) -> case lists:keymember(Opt, 1, Acc) of - true -> - lists:keyreplace(Opt, 1, Acc, {Opt, Val}); - false -> - [{Opt, Val}|Acc] + true -> lists:keyreplace(Opt, 1, Acc, {Opt, Val}); + false -> [{Opt, Val}|Acc] end; (Opt, Acc) -> case lists:member(Opt, Acc) of - true -> Acc; + true -> Acc; false -> [Opt | Acc] end end, Defaults, Options). diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl index 8d0694a94..70a70a967 100644 --- a/src/emqttd_pool_sup.erl +++ b/src/emqttd_pool_sup.erl @@ -28,14 +28,18 @@ -behaviour(supervisor). %% API --export([spec/2, start_link/3, start_link/4]). +-export([spec/1, spec/2, start_link/3, start_link/4]). %% Supervisor callbacks -export([init/1]). +-spec spec(list()) -> supervisor:child_spec(). +spec(Args) -> + spec(pool_sup, Args). + -spec spec(any(), list()) -> supervisor:child_spec(). -spec(Id, Args) -> - {Id, {?MODULE, start_link, Args}, +spec(ChildId, Args) -> + {ChildId, {?MODULE, start_link, Args}, transient, infinity, supervisor, [?MODULE]}. -spec start_link(atom(), atom(), mfa()) -> {ok, pid()} | {error, any()}. @@ -47,7 +51,7 @@ start_link(Pool, Type, MFA) -> start_link(Pool, Type, Size, MFA) -> supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]). -sup_name(Pool) -> +sup_name(Pool) when is_atom(Pool) -> list_to_atom(atom_to_list(Pool) ++ "_pool_sup"). init([Pool, Type, Size, {M, F, Args}]) -> diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 67121be51..909690412 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -63,16 +63,17 @@ name(Id) -> %% @end %%------------------------------------------------------------------------------ submit(Fun) -> - Worker = gproc_pool:pick_worker(pooler), - gen_server:call(Worker, {submit, Fun}, infinity). + gen_server:call(worker(), {submit, Fun}, infinity). %%------------------------------------------------------------------------------ %% @doc Submit work to pooler asynchronously %% @end %%------------------------------------------------------------------------------ async_submit(Fun) -> - Worker = gproc_pool:pick_worker(pooler), - gen_server:cast(Worker, {async_submit, Fun}). + gen_server:cast(worker(), {async_submit, Fun}). + +worker() -> + gproc_pool:pick_worker(pooler). %%%============================================================================= %%% gen_server callbacks diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index f2b545784..403c2d060 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -40,9 +40,9 @@ -copy_mnesia({mnesia, [copy]}). %% API Exports --export([start_link/3]). +-export([start_link/4]). --export([create/1, subscribe/1, subscribe/2, +-export([create/2, subscribe/1, subscribe/2, unsubscribe/1, unsubscribe/2, publish/1]). %% Local node @@ -56,7 +56,7 @@ -compile(export_all). -endif. --record(state, {pool, id}). +-record(state, {pool, id, statsfun}). -define(ROUTER, emqttd_router). @@ -123,26 +123,33 @@ cache_env(Key) -> %% @doc Start one pubsub server %% @end %%------------------------------------------------------------------------------ --spec start_link(Pool, Id, Opts) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - Opts :: list(tuple()). -start_link(Pool, Id, Opts) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, Opts], []). +-spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), + Id :: pos_integer(), + StatsFun :: fun(), + Opts :: list(tuple()). +start_link(Pool, Id, StatsFun, Opts) -> + gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []). name(Id) -> list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). %%------------------------------------------------------------------------------ -%% @doc Create Topic. +%% @doc Create Topic or Subscription. %% @end %%------------------------------------------------------------------------------ --spec create(Topic :: binary()) -> ok | {error, Error :: any()}. -create(Topic) when is_binary(Topic) -> +-spec create(topic | subscription, binary()) -> ok | {error, any()}. +create(topic, Topic) when is_binary(Topic) -> Record = #mqtt_topic{topic = Topic, node = node()}, case mnesia:transaction(fun add_topic/1, [Record]) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error} + end; + +create(subscription, {SubId, Topic, Qos}) -> + case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} end. %%------------------------------------------------------------------------------ @@ -233,12 +240,13 @@ match(Topic) when is_binary(Topic) -> %%% gen_server callbacks %%%============================================================================= -init([Pool, Id, Opts]) -> +init([Pool, Id, StatsFun, Opts]) -> ?ROUTER:init(Opts), ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id}}. + {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}. -handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) -> +handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, + State = #state{statsfun = StatsFun}) -> %% Add routes first ?ROUTER:add_routes(TopicTable, SubPid), @@ -247,11 +255,13 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) -> case mnesia:transaction(fun add_topics/1, [Topics]) of {atomic, _} -> + StatsFun(topic), if_subscription( fun(_) -> %% Add subscriptions Args = [fun add_subscriptions/2, [SubId, TopicTable]], - emqttd_pooler:async_submit({mnesia, async_dirty, Args}) + emqttd_pooler:async_submit({mnesia, async_dirty, Args}), + StatsFun(subscription) end), {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; {aborted, Error} -> @@ -262,14 +272,16 @@ handle_call(Req, _From, State) -> lager:error("Bad Request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State) -> +handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) -> %% Delete routes first ?ROUTER:delete_routes(Topics, SubPid), + %% Remove subscriptions if_subscription( fun(_) -> Args = [fun remove_subscriptions/2, [SubId, Topics]], - emqttd_pooler:async_submit({mnesia, async_dirty, Args}) + emqttd_pooler:async_submit({mnesia, async_dirty, Args}), + StatsFun(subscription) end), {noreply, State}; @@ -311,7 +323,7 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> mnesia:write(topic, TopicR, write); Records -> case lists:member(TopicR, Records) of - true -> ok; + true -> ok; false -> mnesia:write(topic, TopicR, write) end end. @@ -320,20 +332,36 @@ add_subscriptions(undefined, _TopicTable) -> ok; add_subscriptions(SubId, TopicTable) -> lists:foreach(fun({Topic, Qos}) -> - %%TODO: this is not right... - Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos}, - mnesia:write(subscription, Subscription, write) - end,TopicTable). + add_subscription(SubId, {Topic, Qos}) + end,TopicTable). + +add_subscription(SubId, {Topic, Qos}) -> + Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos}, + Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, + Records = mnesia:match_object(subscription, Pattern, write), + case lists:member(Subscription, Records) of + true -> + ok; + false -> + [delete_subscription(Record) || Record <- Records], + insert_subscription(Subscription) + end. + +insert_subscription(Record) -> + mnesia:write(subscription, Record, write). remove_subscriptions(undefined, _Topics) -> ok; remove_subscriptions(SubId, Topics) -> lists:foreach(fun(Topic) -> Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, - [mnesia:delete_object(subscription, Subscription, write) - || Subscription <- mnesia:match_object(subscription, Pattern, write)] + Records = mnesia:match_object(subscription, Pattern, write), + [delete_subscription(Record) || Record <- Records] end, Topics). +delete_subscription(Record) -> + mnesia:delete_object(subscription, Record, write). + %%%============================================================================= %%% Trace Functions %%%============================================================================= diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index 636ada983..6c8384b39 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -30,7 +30,7 @@ -include("emqttd.hrl"). %% API Function Exports --export([start_link/1, aging/1, setstats/1]). +-export([start_link/2, aging/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -42,7 +42,7 @@ -record(aging, {topics, time, tref}). --record(state, {aging :: #aging{}}). +-record(state, {aging :: #aging{}, statsfun}). -define(SERVER, ?MODULE). @@ -53,12 +53,12 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc Start pubsub helper. +%% @doc Start pubsub helper. %% @end %%------------------------------------------------------------------------------ --spec start_link(list(tuple())) -> {ok, pid()} | ignore | {error, any()}. -start_link(Opts) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [Opts], []). +-spec start_link(fun(), list(tuple())) -> {ok, pid()} | ignore | {error, any()}. +start_link(StatsFun, Opts) -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [StatsFun, Opts], []). %%------------------------------------------------------------------------------ %% @doc Aging topics @@ -68,19 +68,11 @@ start_link(Opts) -> aging(Topics) -> gen_server2:cast(?SERVER, {aging, Topics}). -setstats(topic) -> - emqttd_stats:setstats('topics/count', 'topics/max', - mnesia:table_info(topic, size)); -setstats(subscription) -> - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', - mnesia:table_info(subscription, size)). - %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Opts]) -> - +init([StatsFun, Opts]) -> mnesia:subscribe(system), AgingSecs = proplists:get_value(route_aging, Opts, 5), @@ -90,13 +82,15 @@ init([Opts]) -> {ok, #state{aging = #aging{topics = dict:new(), time = AgingSecs, - tref = AgingTref}}}. + tref = AgingTref}, + statsfun = StatsFun}}. start_tick(Secs) -> timer:send_interval(timer:seconds(Secs), {clean, aged}). -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call(Req, _From, State) -> + lager:error("Unexpected Request: ~p", [Req]), + {reply, {error, unsupported_request}, State}. handle_cast({aging, Topics}, State = #state{aging = Aging}) -> #aging{topics = Dict} = Aging, @@ -123,7 +117,7 @@ handle_info({clean, aged}, State = #state{aging = Aging}) -> NewAging = Aging#aging{topics = dict:from_list(Dict1)}, - {noreply, State#state{aging = NewAging}, hibernate}; + noreply(State#state{aging = NewAging}); handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> Pattern = #mqtt_topic{_ = '_', node = Node}, @@ -132,7 +126,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> R <- mnesia:match_object(topic, Pattern, write)] end, mnesia:async_dirty(F), - {noreply, State}; + noreply(State); handle_info(_Info, State) -> {noreply, State}. @@ -147,6 +141,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal Functions %%%============================================================================= +noreply(State = #state{statsfun = StatsFun}) -> + StatsFun(topic), {noreply, State, hibernate}. + try_clean(ByTime, List) -> try_clean(ByTime, List, []). diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 4e85a6698..63d6efd6e 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -42,16 +42,22 @@ start_link() -> init([Opts]) -> %% PubSub Helper - Helper = {helper, {?HELPER, start_link, [Opts]}, + Helper = {helper, {?HELPER, start_link, [fun stats/1, Opts]}, permanent, infinity, worker, [?HELPER]}, %% PubSub Pool Sup - MFA = {emqttd_pubsub, start_link, [Opts]}, - PoolSup = emqttd_pool_sup:spec(pool_sup, [ - pubsub, hash, pool_size(Opts), MFA]), + MFA = {emqttd_pubsub, start_link, [fun stats/1, Opts]}, + PoolSup = emqttd_pool_sup:spec([pubsub, hash, pool_size(Opts), MFA]), {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}. pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Opts, Schedulers). +stats(topic) -> + emqttd_stats:setstats('topics/count', 'topics/max', + mnesia:table_info(topic, size)); +stats(subscription) -> + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', + mnesia:table_info(subscription, size)). + diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index d0b319331..39c4eb5ee 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -56,8 +56,7 @@ init([]) -> %% SM Pool Sup MFA = {?SM, start_link, []}, - PoolSup = emqttd_pool_sup:spec(pool_sup, [ - ?SM, hash, erlang:system_info(schedulers), MFA]), + PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.