diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 176d4ec3e..7b9999a12 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -55,12 +55,10 @@ -type topic() :: binary(). %% @doc Start a local router. --spec start_link(atom(), pos_integer(), fun(), list()) -> {ok, pid()} | {error, any}. +-spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}. start_link(Pool, Id, StatsFun, Env) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Env], []). - -name(Id) -> - list_to_atom("emqttd_router_" ++ integer_to_list(Id)). + gen_server2:start_link({local, emqttd:reg_name(?MODULE,Id)}, + ?MODULE, [Pool, Id, StatsFun, Env], []). %% @doc Route Message on the local node. -spec route(topic(), mqtt_message()) -> any(). @@ -180,23 +178,23 @@ start_tick(Secs) -> handle_call({add_route, Topic, Pid}, _From, State) -> ets:insert(route, {Topic, Pid}), - {reply, ok, State}; + {reply, ok, setstats(State)}; handle_call({add_routes, Topics, Pid}, _From, State) -> ets:insert(route, [{Topic, Pid} || Topic <- Topics]), - {reply, ok, State}; + {reply, ok, setstats(State)}; handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) -> ets:delete_object(route, {Topic, Pid}), + NewState = case has_route(Topic) of - false -> - {noreply, State#state{aging = store_aged(Topic, Aging)}}; - true -> - {noreply, State} - end; + false -> State#state{aging = store_aged(Topic, Aging)}; + true -> State + end, + {noreply, setstats(NewState)}; handle_cast({delete_routes, Topics, Pid}, State) -> NewAging = @@ -207,7 +205,7 @@ handle_cast({delete_routes, Topics, Pid}, State) -> true -> Aging end end, State#state.aging, Topics), - {noreply, State#state{aging = NewAging}}; + {noreply, setstats(State#state{aging = NewAging})}; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). @@ -280,3 +278,6 @@ store_aged(Topic, Aging = #aging{topics = Dict}) -> Now = emqttd_util:now_to_secs(), Aging#aging{topics = dict:store(Topic, Now, Dict)}. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(route), State. +