From ba5541c768acf342f03d74d3d3c65002f6b29cd4 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 12 Jan 2016 14:02:21 +0800 Subject: [PATCH] Move the aging code to router --- src/emqttd_pubsub_helper.erl | 142 ++++------------------------------- 1 file changed, 16 insertions(+), 126 deletions(-) diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index 4ae8452fb..820280445 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,176 +19,66 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc PubSub Route Aging Helper +%%% @doc PubSub Helper. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- -module(emqttd_pubsub_helper). --behaviour(gen_server2). +-behaviour(gen_server). -include("emqttd.hrl"). -include("emqttd_internal.hrl"). %% API Function Exports --export([start_link/2, aging/1]). +-export([start_link/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --ifdef(TEST). --compile(export_all). --endif. - --record(aging, {topics, time, tref}). - --record(state, {aging :: #aging{}, statsfun}). +-record(state, {statsfun}). -define(SERVER, ?MODULE). --define(ROUTER, emqttd_router). - -%%%============================================================================= -%%% API -%%%============================================================================= - -%%------------------------------------------------------------------------------ -%% @doc Start pubsub helper. -%% @end -%%------------------------------------------------------------------------------ --spec start_link(fun(), list(tuple())) -> {ok, pid()} | ignore | {error, any()}. -start_link(StatsFun, Opts) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [StatsFun, Opts], []). - -%%------------------------------------------------------------------------------ -%% @doc Aging topics -%% @end -%%------------------------------------------------------------------------------ --spec aging(list(binary())) -> ok. -aging(Topics) -> - gen_server2:cast(?SERVER, {aging, Topics}). +%% @doc Start PubSub Helper. +-spec start_link(fun()) -> {ok, pid()} | ignore | {error, any()}. +start_link(StatsFun) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([StatsFun, Opts]) -> +init([StatsFun]) -> mnesia:subscribe(system), - - AgingSecs = proplists:get_value(route_aging, Opts, 5), - - %% Aging Timer - {ok, AgingTref} = start_tick(AgingSecs div 2), - - {ok, #state{aging = #aging{topics = dict:new(), - time = AgingSecs, - tref = AgingTref}, - statsfun = StatsFun}}. - -start_tick(Secs) -> - timer:send_interval(timer:seconds(Secs), {clean, aged}). + {ok, #state{statsfun = StatsFun}}. handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({aging, Topics}, State = #state{aging = Aging}) -> - #aging{topics = Dict} = Aging, - TS = emqttd_util:now_to_secs(), - Dict1 = - lists:foldl(fun(Topic, Acc) -> - case dict:find(Topic, Acc) of - {ok, _} -> Acc; - error -> dict:store(Topic, TS, Acc) - end - end, Dict, Topics), - {noreply, State#state{aging = Aging#aging{topics = Dict1}}}; - handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({clean, aged}, State = #state{aging = Aging}) -> - - #aging{topics = Dict, time = Time} = Aging, - - ByTime = emqttd_util:now_to_secs() - Time, - - Dict1 = try_clean(ByTime, dict:to_list(Dict)), - - NewAging = Aging#aging{topics = dict:from_list(Dict1)}, - - noreply(State#state{aging = NewAging}); - handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - %% mnesia master? + %% TODO: mnesia master? Pattern = #mqtt_topic{_ = '_', node = Node}, F = fun() -> [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, Pattern, write)] end, - mnesia:async_dirty(F), - noreply(State); + mnesia:transaction(F), noreply(State); handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, #state{aging = #aging{tref = TRef}}) -> - timer:cancel(TRef). +terminate(_Reason, _State) -> + mnesia:unsubscribe(system). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%%============================================================================= -%%% Internal Functions -%%%============================================================================= - noreply(State = #state{statsfun = StatsFun}) -> - StatsFun(topic), - {noreply, State, hibernate}. - -try_clean(ByTime, List) -> - try_clean(ByTime, List, []). - -try_clean(_ByTime, [], Acc) -> - Acc; - -try_clean(ByTime, [{Topic, TS} | Left], Acc) -> - case ?ROUTER:has_route(Topic) of - false -> - try_clean2(ByTime, {Topic, TS}, Left, Acc); - true -> - try_clean(ByTime, Left, Acc) - end. - -try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime -> - try_clean(ByTime, Left, [{Topic, TS}|Acc]); - -try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - mnesia:transaction(fun try_remove_topic/1, [TopicR]), - try_clean(ByTime, Left, Acc). - -try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> - %% Lock topic first - case mnesia:wread({topic, Topic}) of - [] -> ok; - [TopicR] -> - if_no_route(Topic, fun() -> - %% Remove topic and trie - mnesia:delete_object(topic, TopicR, write), - emqttd_trie:delete(Topic) - end); - _More -> - if_no_route(Topic, fun() -> - %% Remove topic - mnesia:delete_object(topic, TopicR, write) - end) - end. - -if_no_route(Topic, Fun) -> - case ?ROUTER:has_route(Topic) of - true -> ok; - false -> Fun() - end. + StatsFun(topic), {noreply, State}.