diff --git a/src/emqx_router.erl b/src/emqx_router.erl index c79482bd2..56f94eb08 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -30,7 +30,9 @@ -export([start_link/2]). %% Route APIs --export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]). +-export([add_route/1, add_route/2, add_route/3]). +-export([get_routes/1]). +-export([del_route/1, del_route/2, del_route/3]). -export([has_routes/1, match_routes/1, print_routes/1]). %% Topics @@ -42,10 +44,17 @@ -type(destination() :: node() | {binary(), node()}). --record(state, {pool, id}). +-record(batch, {enabled, timer, pending}). + +-record(state, {pool, id, batch :: #batch{}}). -define(ROUTE, emqx_route). +-define(BATCH(Enabled), #batch{enabled = Enabled}). + +-define(BATCH(Enabled, Pending), + #batch{enabled = Enabled, pending = Pending}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -68,34 +77,45 @@ mnesia(copy) -> -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, - ?MODULE, [Pool, Id], [{hibernate_after, 10000}]). + ?MODULE, [Pool, Id], [{hibernate_after, 2000}]). %%-------------------------------------------------------------------- %% Route APIs %%-------------------------------------------------------------------- -%% @doc Add a route +-spec(add_route(topic() | route()) -> ok). +add_route(Topic) when is_binary(Topic) -> + add_route(#route{topic = Topic, dest = node()}); +add_route(Route = #route{topic = Topic}) -> + cast(pick(Topic), {add_route, Route}). + -spec(add_route(topic(), destination()) -> ok). add_route(Topic, Dest) when is_binary(Topic) -> - cast(pick(Topic), {add_route, #route{topic = Topic, dest = Dest}}). + add_route(#route{topic = Topic, dest = Dest}). -spec(add_route({pid(), reference()}, topic(), destination()) -> ok). add_route(From, Topic, Dest) when is_binary(Topic) -> - cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}). + Route = #route{topic = Topic, dest = Dest}, + cast(pick(Topic), {add_route, From, Route}). -%% @doc Get routes -spec(get_routes(topic()) -> [route()]). get_routes(Topic) -> ets:lookup(?ROUTE, Topic). -%% @doc Delete a route +-spec(del_route(topic() | route()) -> ok). +del_route(Topic) when is_binary(Topic) -> + del_route(#route{topic = Topic, dest = node()}); +del_route(Route = #route{topic = Topic}) -> + cast(pick(Topic), {del_route, Route}). + -spec(del_route(topic(), destination()) -> ok). del_route(Topic, Dest) when is_binary(Topic) -> - cast(pick(Topic), {del_route, #route{topic = Topic, dest = Dest}}). + del_route(#route{topic = Topic, dest = Dest}). -spec(del_route({pid(), reference()}, topic(), destination()) -> ok). del_route(From, Topic, Dest) when is_binary(Topic) -> - cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}). + Route = #route{topic = Topic, dest = Dest}, + cast(pick(Topic), {del_route, From, Route}). %% @doc Has routes? -spec(has_routes(topic()) -> boolean()). @@ -131,17 +151,20 @@ pick(Topic) -> %%-------------------------------------------------------------------- init([Pool, Id]) -> + rand:seed(exsplus, erlang:timestamp()), gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id}}. + Batch = #batch{enabled = emqx_config:get_env(route_batch_delete, false), + pending = sets:new()}, + {ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}. handle_call(Req, _From, State) -> emqx_logger:error("[Router] Unexpected request: ~p", [Req]), {reply, ignore, State}. handle_cast({add_route, From, Route}, State) -> - _ = handle_cast({add_route, Route}, State), + {noreply, NewState} = handle_cast({add_route, Route}, State), gen_server:reply(From, ok), - {noreply, State}; + {noreply, NewState}; handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) -> case lists:member(Route, get_routes(Topic)) of @@ -156,31 +179,36 @@ handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) -> {noreply, State}; handle_cast({del_route, From, Route}, State) -> - _ = handle_cast({del_route, Route}, State), + {noreply, NewState} = handle_cast({del_route, Route}, State), gen_server:reply(From, ok), - {noreply, State}; + {noreply, NewState}; handle_cast({del_route, Route = #route{topic = Topic}}, State) -> %% Confirm if there are still subscribers... - case ets:member(emqx_subscriber, Topic) of - true -> ok; - false -> - case emqx_topic:wildcard(Topic) of - true -> log(trans(fun del_trie_route/1, [Route])); - false -> del_direct_route(Route) - end - end, - {noreply, State}; + {noreply, case ets:member(emqx_subscriber, Topic) of + true -> State; + false -> + case emqx_topic:wildcard(Topic) of + true -> log(trans(fun del_trie_route/1, [Route])), + State; + false -> del_direct_route(Route, State) + end + end}; handle_cast(Msg, State) -> emqx_logger:error("[Router] Unexpected msg: ~p", [Msg]), {noreply, State}. +handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) -> + _ = del_direct_routes(Batch#batch.pending), + {noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate}; + handle_info(Info, State) -> emqx_logger:error("[Router] Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> +terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) -> + _ = cacel_batch_timer(Batch), gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> @@ -190,6 +218,17 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) -> + State; +ensure_batch_timer(State = #state{batch = Batch}) -> + TRef = erlang:start_timer(50 + rand:uniform(50), self(), batch_delete), + State#state{batch = Batch#batch{timer = TRef}}. + +cacel_batch_timer(#batch{enabled = false}) -> + ok; +cacel_batch_timer(#batch{enabled = true, timer = TRef}) -> + erlang:cancel_timer(TRef). + add_direct_route(Route) -> mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]). @@ -200,9 +239,25 @@ add_trie_route(Route = #route{topic = Topic}) -> end, mnesia:write(?ROUTE, Route, sticky_write). +del_direct_route(Route, State = #state{batch = ?BATCH(false)}) -> + del_direct_route(Route), State; +del_direct_route(Route, State = #state{batch = Batch = ?BATCH(true, Pending)}) -> + State#state{batch = Batch#batch{pending = sets:add_element(Route, Pending)}}. + del_direct_route(Route) -> mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]). +del_direct_routes([]) -> + ok; +del_direct_routes(Routes) -> + DelFun = fun(R = #route{topic = Topic}) -> + case ets:member(emqx_subscriber, Topic) of + true -> ok; + false -> mnesia:delete_object(?ROUTE, R, sticky_write) + end + end, + mnesia:async_dirty(fun lists:foreach/2, [DelFun, Routes]). + del_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE, Topic}) of [Route] -> %% Remove route and trie diff --git a/test/emqx_serializer_SUITE.erl b/test/emqx_serializer_SUITE.erl deleted file mode 100644 index d0a34fd20..000000000 --- a/test/emqx_serializer_SUITE.erl +++ /dev/null @@ -1,91 +0,0 @@ -%%%=================================================================== -%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at -%%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%%=================================================================== - --module(emqx_serializer_SUITE). - --compile(export_all). - --include("emqx_mqtt.hrl"). - --include_lib("eunit/include/eunit.hrl"). - --import(emqx_serializer, [serialize/1]). - -all() -> - [serialize_connect, - serialize_connack, - serialize_publish, - serialize_puback, - serialize_pubrel, - serialize_subscribe, - serialize_suback, - serialize_unsubscribe, - serialize_unsuback, - serialize_pingreq, - serialize_pingresp, - serialize_disconnect]. - -serialize_connect(_) -> - serialize(?CONNECT_PACKET(#mqtt_packet_connect{})), - serialize(?CONNECT_PACKET(#mqtt_packet_connect{ - client_id = <<"clientId">>, - will_qos = ?QOS1, - will_flag = true, - will_retain = true, - will_topic = <<"will">>, - will_payload = <<"haha">>, - clean_sess = true})). - -serialize_connack(_) -> - ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}}, - ?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(ConnAck))). - -serialize_publish(_) -> - serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)), - serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)), - serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 99, long_payload())). - -serialize_puback(_) -> - serialize(?PUBACK_PACKET(?PUBACK, 10384)). - -serialize_pubrel(_) -> - serialize(?PUBREL_PACKET(10384)). - -serialize_subscribe(_) -> - TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}], - serialize(?SUBSCRIBE_PACKET(10, TopicTable)). - -serialize_suback(_) -> - serialize(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])). - -serialize_unsubscribe(_) -> - serialize(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])). - -serialize_unsuback(_) -> - serialize(?UNSUBACK_PACKET(10)). - -serialize_pingreq(_) -> - serialize(?PACKET(?PINGREQ)). - -serialize_pingresp(_) -> - serialize(?PACKET(?PINGRESP)). - -serialize_disconnect(_) -> - serialize(?PACKET(?DISCONNECT)). - -long_payload() -> - iolist_to_binary(["payload." || _I <- lists:seq(1, 100)]).