trans, and stats_timer

This commit is contained in:
Feng 2016-03-14 15:47:31 +08:00
parent 50a11ce6c9
commit 9584f2a990
1 changed files with 28 additions and 12 deletions

View File

@ -26,8 +26,10 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% Start/Stop
-export([start_link/0, stop/0]). -export([start_link/0, stop/0]).
%% Route APIs
-export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1, -export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1,
del_route/1, del_route/2, del_routes/1, has_route/1]). del_route/1, del_route/2, del_routes/1, has_route/1]).
@ -35,7 +37,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {}). -record(state, {stats_timer}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia Bootstrap %% Mnesia Bootstrap
@ -89,10 +91,10 @@ add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
%% @doc Add Routes %% @doc Add Routes
-spec(add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}). -spec(add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}).
add_routes(Routes) -> add_routes(Routes) ->
Add = fun() -> [add_route_(Route) || Route <- Routes] end, AddFun = fun() -> [add_route_(Route) || Route <- Routes] end,
case mnesia:transaction(Add) of case mnesia:is_transaction() of
{atomic, _} -> update_stats_(), ok; true -> AddFun();
{aborted, Error} -> {error, Error} false -> trans(AddFun)
end. end.
%% @private %% @private
@ -125,10 +127,10 @@ del_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
%% @doc Delete Routes %% @doc Delete Routes
-spec(del_routes([mqtt_route()]) -> ok | {error, any()}). -spec(del_routes([mqtt_route()]) -> ok | {error, any()}).
del_routes(Routes) -> del_routes(Routes) ->
Del = fun() -> [del_route_(Route) || Route <- Routes] end, DelFun = fun() -> [del_route_(Route) || Route <- Routes] end,
case mnesia:transaction(Del) of case mnesia:is_transaction() of
{atomic, _} -> update_stats_(), ok; true -> DelFun();
{aborted, Error} -> {error, Error} false -> trans(DelFun)
end. end.
del_route_(Route = #mqtt_route{topic = Topic}) -> del_route_(Route = #mqtt_route{topic = Topic}) ->
@ -156,6 +158,14 @@ has_route(Topic) ->
end, end,
length(Routes) > 0. length(Routes) > 0.
%% @private
-spec(trans(function()) -> ok | {error, any()}).
trans(Fun) ->
case mnesia:transaction(Fun) of
{atomic, _} -> ok;
{aborted, Error} -> {error, Error}
end.
stop() -> gen_server:call(?MODULE, stop). stop() -> gen_server:call(?MODULE, stop).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -164,7 +174,8 @@ stop() -> gen_server:call(?MODULE, stop).
init([]) -> init([]) ->
mnesia:subscribe(system), mnesia:subscribe(system),
{ok, #state{}}. {ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_timer = TRef}}.
handle_call(stop, _From, State) -> handle_call(stop, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
@ -183,7 +194,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
lager:error("Mnesia down: ~p~n", [Node]), lager:error("Mnesia down: ~p~n", [Node]),
clean_routes_(Node), clean_routes_(Node),
update_stats_(), update_stats_(),
{noreply, State}; {noreply, State, hibernate};
handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) -> handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
%% 1. Backup and restart %% 1. Backup and restart
@ -198,10 +209,15 @@ handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) ->
handle_info({mnesia_system_event, _Event}, State) -> handle_info({mnesia_system_event, _Event}, State) ->
{noreply, State}; {noreply, State};
handle_info(stats, State) ->
update_stats_(),
{noreply, State, hibernate};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, #state{stats_timer = TRef}) ->
timer:cancel(TRef),
mnesia:unsubscribe(system). mnesia:unsubscribe(system).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->