diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 13d8b197b..732cb448d 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -26,8 +26,10 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +%% Start/Stop -export([start_link/0, stop/0]). +%% Route APIs -export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1, del_route/1, del_route/2, del_routes/1, has_route/1]). @@ -35,7 +37,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). +-record(state, {stats_timer}). %%-------------------------------------------------------------------- %% Mnesia Bootstrap @@ -89,10 +91,10 @@ add_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> %% @doc Add Routes -spec(add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}). add_routes(Routes) -> - Add = fun() -> [add_route_(Route) || Route <- Routes] end, - case mnesia:transaction(Add) of - {atomic, _} -> update_stats_(), ok; - {aborted, Error} -> {error, Error} + AddFun = fun() -> [add_route_(Route) || Route <- Routes] end, + case mnesia:is_transaction() of + true -> AddFun(); + false -> trans(AddFun) end. %% @private @@ -125,10 +127,10 @@ del_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> %% @doc Delete Routes -spec(del_routes([mqtt_route()]) -> ok | {error, any()}). del_routes(Routes) -> - Del = fun() -> [del_route_(Route) || Route <- Routes] end, - case mnesia:transaction(Del) of - {atomic, _} -> update_stats_(), ok; - {aborted, Error} -> {error, Error} + DelFun = fun() -> [del_route_(Route) || Route <- Routes] end, + case mnesia:is_transaction() of + true -> DelFun(); + false -> trans(DelFun) end. del_route_(Route = #mqtt_route{topic = Topic}) -> @@ -156,6 +158,14 @@ has_route(Topic) -> end, length(Routes) > 0. +%% @private +-spec(trans(function()) -> ok | {error, any()}). +trans(Fun) -> + case mnesia:transaction(Fun) of + {atomic, _} -> ok; + {aborted, Error} -> {error, Error} + end. + stop() -> gen_server:call(?MODULE, stop). %%-------------------------------------------------------------------- @@ -164,7 +174,8 @@ stop() -> gen_server:call(?MODULE, stop). init([]) -> mnesia:subscribe(system), - {ok, #state{}}. + {ok, TRef} = timer:send_interval(timer:seconds(1), stats), + {ok, #state{stats_timer = TRef}}. handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -183,7 +194,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> lager:error("Mnesia down: ~p~n", [Node]), clean_routes_(Node), update_stats_(), - {noreply, State}; + {noreply, State, hibernate}; handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) -> %% 1. Backup and restart @@ -198,10 +209,15 @@ handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) -> handle_info({mnesia_system_event, _Event}, State) -> {noreply, State}; +handle_info(stats, State) -> + update_stats_(), + {noreply, State, hibernate}; + handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{stats_timer = TRef}) -> + timer:cancel(TRef), mnesia:unsubscribe(system). code_change(_OldVsn, State, _Extra) ->