Support batch delete

This commit is contained in:
Feng Lee 2018-05-10 22:03:59 +08:00
parent c11e8f453b
commit bf253ab9b3
2 changed files with 80 additions and 116 deletions

View File

@ -30,7 +30,9 @@
-export([start_link/2]).
%% Route APIs
-export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]).
-export([add_route/1, add_route/2, add_route/3]).
-export([get_routes/1]).
-export([del_route/1, del_route/2, del_route/3]).
-export([has_routes/1, match_routes/1, print_routes/1]).
%% Topics
@ -42,10 +44,17 @@
-type(destination() :: node() | {binary(), node()}).
-record(state, {pool, id}).
-record(batch, {enabled, timer, pending}).
-record(state, {pool, id, batch :: #batch{}}).
-define(ROUTE, emqx_route).
-define(BATCH(Enabled), #batch{enabled = Enabled}).
-define(BATCH(Enabled, Pending),
#batch{enabled = Enabled, pending = Pending}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -68,34 +77,45 @@ mnesia(copy) ->
-> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], [{hibernate_after, 10000}]).
?MODULE, [Pool, Id], [{hibernate_after, 2000}]).
%%--------------------------------------------------------------------
%% Route APIs
%%--------------------------------------------------------------------
%% @doc Add a route
-spec(add_route(topic() | route()) -> ok).
add_route(Topic) when is_binary(Topic) ->
add_route(#route{topic = Topic, dest = node()});
add_route(Route = #route{topic = Topic}) ->
cast(pick(Topic), {add_route, Route}).
-spec(add_route(topic(), destination()) -> ok).
add_route(Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {add_route, #route{topic = Topic, dest = Dest}}).
add_route(#route{topic = Topic, dest = Dest}).
-spec(add_route({pid(), reference()}, topic(), destination()) -> ok).
add_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}).
Route = #route{topic = Topic, dest = Dest},
cast(pick(Topic), {add_route, From, Route}).
%% @doc Get routes
-spec(get_routes(topic()) -> [route()]).
get_routes(Topic) ->
ets:lookup(?ROUTE, Topic).
%% @doc Delete a route
-spec(del_route(topic() | route()) -> ok).
del_route(Topic) when is_binary(Topic) ->
del_route(#route{topic = Topic, dest = node()});
del_route(Route = #route{topic = Topic}) ->
cast(pick(Topic), {del_route, Route}).
-spec(del_route(topic(), destination()) -> ok).
del_route(Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {del_route, #route{topic = Topic, dest = Dest}}).
del_route(#route{topic = Topic, dest = Dest}).
-spec(del_route({pid(), reference()}, topic(), destination()) -> ok).
del_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}).
Route = #route{topic = Topic, dest = Dest},
cast(pick(Topic), {del_route, From, Route}).
%% @doc Has routes?
-spec(has_routes(topic()) -> boolean()).
@ -131,17 +151,20 @@ pick(Topic) ->
%%--------------------------------------------------------------------
init([Pool, Id]) ->
rand:seed(exsplus, erlang:timestamp()),
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id}}.
Batch = #batch{enabled = emqx_config:get_env(route_batch_delete, false),
pending = sets:new()},
{ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}.
handle_call(Req, _From, State) ->
emqx_logger:error("[Router] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast({add_route, From, Route}, State) ->
_ = handle_cast({add_route, Route}, State),
{noreply, NewState} = handle_cast({add_route, Route}, State),
gen_server:reply(From, ok),
{noreply, State};
{noreply, NewState};
handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
case lists:member(Route, get_routes(Topic)) of
@ -156,31 +179,36 @@ handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
{noreply, State};
handle_cast({del_route, From, Route}, State) ->
_ = handle_cast({del_route, Route}, State),
{noreply, NewState} = handle_cast({del_route, Route}, State),
gen_server:reply(From, ok),
{noreply, State};
{noreply, NewState};
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
%% Confirm if there are still subscribers...
case ets:member(emqx_subscriber, Topic) of
true -> ok;
false ->
case emqx_topic:wildcard(Topic) of
true -> log(trans(fun del_trie_route/1, [Route]));
false -> del_direct_route(Route)
end
end,
{noreply, State};
{noreply, case ets:member(emqx_subscriber, Topic) of
true -> State;
false ->
case emqx_topic:wildcard(Topic) of
true -> log(trans(fun del_trie_route/1, [Route])),
State;
false -> del_direct_route(Route, State)
end
end};
handle_cast(Msg, State) ->
emqx_logger:error("[Router] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) ->
_ = del_direct_routes(Batch#batch.pending),
{noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate};
handle_info(Info, State) ->
emqx_logger:error("[Router] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) ->
_ = cacel_batch_timer(Batch),
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
@ -190,6 +218,17 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) ->
State;
ensure_batch_timer(State = #state{batch = Batch}) ->
TRef = erlang:start_timer(50 + rand:uniform(50), self(), batch_delete),
State#state{batch = Batch#batch{timer = TRef}}.
cacel_batch_timer(#batch{enabled = false}) ->
ok;
cacel_batch_timer(#batch{enabled = true, timer = TRef}) ->
erlang:cancel_timer(TRef).
add_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
@ -200,9 +239,25 @@ add_trie_route(Route = #route{topic = Topic}) ->
end,
mnesia:write(?ROUTE, Route, sticky_write).
del_direct_route(Route, State = #state{batch = ?BATCH(false)}) ->
del_direct_route(Route), State;
del_direct_route(Route, State = #state{batch = Batch = ?BATCH(true, Pending)}) ->
State#state{batch = Batch#batch{pending = sets:add_element(Route, Pending)}}.
del_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
del_direct_routes([]) ->
ok;
del_direct_routes(Routes) ->
DelFun = fun(R = #route{topic = Topic}) ->
case ets:member(emqx_subscriber, Topic) of
true -> ok;
false -> mnesia:delete_object(?ROUTE, R, sticky_write)
end
end,
mnesia:async_dirty(fun lists:foreach/2, [DelFun, Routes]).
del_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE, Topic}) of
[Route] -> %% Remove route and trie

View File

@ -1,91 +0,0 @@
%%%===================================================================
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%===================================================================
-module(emqx_serializer_SUITE).
-compile(export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_serializer, [serialize/1]).
all() ->
[serialize_connect,
serialize_connack,
serialize_publish,
serialize_puback,
serialize_pubrel,
serialize_subscribe,
serialize_suback,
serialize_unsubscribe,
serialize_unsuback,
serialize_pingreq,
serialize_pingresp,
serialize_disconnect].
serialize_connect(_) ->
serialize(?CONNECT_PACKET(#mqtt_packet_connect{})),
serialize(?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"clientId">>,
will_qos = ?QOS1,
will_flag = true,
will_retain = true,
will_topic = <<"will">>,
will_payload = <<"haha">>,
clean_sess = true})).
serialize_connack(_) ->
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(ConnAck))).
serialize_publish(_) ->
serialize(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)),
serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 99, long_payload())).
serialize_puback(_) ->
serialize(?PUBACK_PACKET(?PUBACK, 10384)).
serialize_pubrel(_) ->
serialize(?PUBREL_PACKET(10384)).
serialize_subscribe(_) ->
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
serialize(?SUBSCRIBE_PACKET(10, TopicTable)).
serialize_suback(_) ->
serialize(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
serialize_unsubscribe(_) ->
serialize(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
serialize_unsuback(_) ->
serialize(?UNSUBACK_PACKET(10)).
serialize_pingreq(_) ->
serialize(?PACKET(?PINGREQ)).
serialize_pingresp(_) ->
serialize(?PACKET(?PINGRESP)).
serialize_disconnect(_) ->
serialize(?PACKET(?DISCONNECT)).
long_payload() ->
iolist_to_binary(["payload." || _I <- lists:seq(1, 100)]).