From dfdad8bcb61ae776ccf3cf286cbe85306e0babd2 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 14 Feb 2016 01:48:49 +0800 Subject: [PATCH] refactor the router and pubsub --- src/emqttd_pubsub.erl | 9 ++- src/emqttd_router.erl | 104 +++++++++++++++------------ test/emqttd_access_control_tests.erl | 2 +- test/emqttd_mqueue_tests.erl | 1 - test/emqttd_tests.erl | 31 -------- 5 files changed, 65 insertions(+), 82 deletions(-) delete mode 100644 test/emqttd_tests.erl diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 908f23610..cab13ee3e 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -228,9 +228,9 @@ publish(To, Msg) -> %% @doc Match Topic Name with Topic Filters -spec match(emqttd_topic:topic()) -> [mqtt_topic()]. match(To) -> - MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), + Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), %% ets:lookup for topic table will be replicated to all nodes. - lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). + lists:append([ets:lookup(topic, Topic) || Topic <- [To | Matched]]). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -333,7 +333,10 @@ add_topics(Records) -> add_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> - ok = emqttd_trie:insert(Topic), + case emqttd_topic:wildcard(Topic) of + true -> emqttd_trie:insert(Topic); + false -> ok + end, mnesia:write(topic, TopicR, write); Records -> case lists:member(TopicR, Records) of diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 76816302e..8cd03302f 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -14,7 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Message Router +%% @doc +%% The Message Router on Local Node. +%% @end -module(emqttd_router). -behaviour(gen_server2). @@ -43,22 +45,26 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-ifdef(TEST). +-compile(export_all). +-endif. + -record(aging, {topics, time, tref}). --record(state, {pool, id, statsfun, aging :: #aging{}}). +-record(state, {pool, id, aging :: #aging{}, statsfun}). -%% @doc Start a local router. +%% @doc Start a router. -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}. start_link(Pool, Id, StatsFun, Env) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, StatsFun, Env], []). -%% @doc Route Message on the local node. +%% @doc Route Message on this node. -spec route(emqttd_topic:topic(), mqtt_message()) -> any(). route(Queue = <<"$Q/", _Q>>, Msg) -> case lookup_routes(Queue) of [] -> - emqttd_metrics:inc('messages/dropped'); + dropped(Queue); [SubPid] -> SubPid ! {dispatch, Queue, Msg}; Routes -> @@ -70,8 +76,8 @@ route(Queue = <<"$Q/", _Q>>, Msg) -> route(Topic, Msg) -> case lookup_routes(Topic) of [] -> - emqttd_metrics:inc('messages/dropped'); - [SubPid] -> %% optimize + dropped(Topic); + [SubPid] -> SubPid ! {dispatch, Topic, Msg}; Routes -> lists:foreach(fun(SubPid) -> @@ -79,9 +85,16 @@ route(Topic, Msg) -> end, Routes) end. +%% @private +%% @doc Ingore $SYS Messages. +dropped(<<"$SYS/", _/binary>>) -> + ok; +dropped(_Topic) -> + emqttd_metrics:inc('messages/dropped'). + %% @doc Has Route? -spec has_route(emqttd_topic:topic()) -> boolean(). -has_route(Topic) -> +has_route(Topic) when is_binary(Topic) -> ets:member(route, Topic). %% @doc Lookup Routes @@ -94,12 +107,12 @@ lookup_routes(Topic) when is_binary(Topic) -> [] end. -%% @doc Add Route. +%% @doc Add Route -spec add_route(emqttd_topic:topic(), pid()) -> ok. add_route(Topic, Pid) when is_pid(Pid) -> call(pick(Topic), {add_route, Topic, Pid}). -%% @doc Add Routes. +%% @doc Add Routes -spec add_routes(list(emqttd_topic:topic()), pid()) -> ok. add_routes([], _Pid) -> ok; @@ -111,12 +124,12 @@ add_routes(Topics, Pid) -> call(Router, {add_routes, Slice, Pid}) end, slice(Topics)). -%% @doc Delete Route. +%% @doc Delete Route -spec delete_route(emqttd_topic:topic(), pid()) -> ok. delete_route(Topic, Pid) -> cast(pick(Topic), {delete_route, Topic, Pid}). -%% @doc Delete Routes. +%% @doc Delete Routes -spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok. delete_routes([Topic], Pid) -> delete_route(Topic, Pid); @@ -136,8 +149,11 @@ slice(Topics) -> pick(Topic) -> gproc_pool:pick_worker(router, Topic). +%% @doc For unit test. stop(Id) when is_integer(Id) -> - gen_server2:call(?PROC_NAME(?MODULE, Id), stop). + gen_server2:call(?PROC_NAME(?MODULE, Id), stop); +stop(Pid) when is_pid(Pid) -> + gen_server2:call(Pid, stop). call(Router, Request) -> gen_server2:call(Router, Request, infinity). @@ -147,21 +163,22 @@ cast(Router, Msg) -> init([Pool, Id, StatsFun, Opts]) -> - emqttd_time:seed(), - %% Calls from pubsub should be scheduled first? process_flag(priority, high), + emqttd_time:seed(), + ?GPROC_POOL(join, Pool, Id), + Aging = init_aging(Opts), + + {ok, #state{pool = Pool, id = Id, aging = Aging, statsfun = StatsFun}}. + +%% Init Aging +init_aging(Opts) -> AgingSecs = proplists:get_value(route_aging, Opts, 5), - - %% Aging Timer {ok, AgingTref} = start_tick(AgingSecs + random:uniform(AgingSecs)), - - Aging = #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref}, - - {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, aging = Aging}}. + #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref}. start_tick(Secs) -> timer:send_interval(timer:seconds(Secs), {clean, aged}). @@ -181,24 +198,15 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) -> - ets:delete_object(route, {Topic, Pid}), - NewState = - case has_route(Topic) of - false -> State#state{aging = store_aged(Topic, Aging)}; - true -> State - end, - {noreply, setstats(NewState)}; + Aging1 = delete_route_(Topic, Pid, Aging), + {noreply, setstats(State#state{aging = Aging1})}; handle_cast({delete_routes, Topics, Pid}, State) -> - NewAging = + Aging1 = lists:foldl(fun(Topic, Aging) -> - ets:delete_object(route, {Topic, Pid}), - case has_route(Topic) of - false -> store_aged(Topic, Aging); - true -> Aging - end + delete_route_(Topic, Pid, Aging) end, State#state.aging, Topics), - {noreply, setstats(State#state{aging = NewAging})}; + {noreply, setstats(State#state{aging = Aging1})}; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). @@ -211,9 +219,9 @@ handle_info({clean, aged}, State = #state{aging = Aging}) -> Dict1 = try_clean(ByTime, dict:to_list(Dict)), - NewAging = Aging#aging{topics = dict:from_list(Dict1)}, + Aging1 = Aging#aging{topics = dict:from_list(Dict1)}, - {noreply, State#state{aging = NewAging}, hibernate}; + {noreply, State#state{aging = Aging1}, hibernate}; handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). @@ -225,6 +233,13 @@ terminate(_Reason, #state{pool = Pool, id = Id, aging = #aging{tref = TRef}}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +delete_route_(Topic, Pid, Aging) -> + ets:delete_object(route, {Topic, Pid}), + case has_route(Topic) of + false -> store_aged(Topic, Aging); + true -> Aging + end. + try_clean(ByTime, List) -> try_clean(ByTime, List, []). @@ -253,15 +268,12 @@ try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> %% Lock topic first case mnesia:wread({topic, Topic}) of - [] -> - ok; %% mnesia:abort(not_found); - [TopicR] -> - %% Remove topic and trie - delete_topic(TopicR), - emqttd_trie:delete(Topic); - _More -> - %% Remove topic only - delete_topic(TopicR) + [] -> ok; + [TopicR] -> %% Remove topic and trie + delete_topic(TopicR), + emqttd_trie:delete(Topic); + _More -> %% Remove topic only + delete_topic(TopicR) end. delete_topic(TopicR) -> diff --git a/test/emqttd_access_control_tests.erl b/test/emqttd_access_control_tests.erl index 78796b6b1..1efc686cc 100644 --- a/test/emqttd_access_control_tests.erl +++ b/test/emqttd_access_control_tests.erl @@ -39,7 +39,7 @@ setup() -> {auth, [{anonymous, []}]}, {acl, [ %% ACL config %% Internal ACL module - {internal, [{file, "./testdata/test_acl.config"}, {nomatch, allow}]} + {internal, [{file, "../testdata/test_acl.config"}, {nomatch, allow}]} ]} ], ?AC:start_link(AclOpts). diff --git a/test/emqttd_mqueue_tests.erl b/test/emqttd_mqueue_tests.erl index a8d6e1137..20111a66a 100644 --- a/test/emqttd_mqueue_tests.erl +++ b/test/emqttd_mqueue_tests.erl @@ -149,7 +149,6 @@ priority_mqueue2_test() -> Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), ?assertEqual(4, ?Q:len(Q4)), {{value, Val}, Q5} = ?Q:out(Q4), - ?debugFmt("Val: ~p~n", [Val]), ?assertEqual(3, ?Q:len(Q5)). alarm_fun() -> fun(_, _) -> alarm_fun() end. diff --git a/test/emqttd_tests.erl b/test/emqttd_tests.erl deleted file mode 100644 index 5c4f6fd1f..000000000 --- a/test/emqttd_tests.erl +++ /dev/null @@ -1,31 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_tests). - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - -start_stop_test() -> - application:start(lager), - application:ensure_all_started(emqttd), - application:stop(emqttd), - application:stop(esockd), - application:stop(gproc). - --endif. -