From 284abc73885ab7a811dd73b497057aa28b001c52 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 17 Jan 2016 20:03:47 +0800 Subject: [PATCH] fix reverse_route stats --- src/emqttd_pubsub.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index ee4b5968b..5ef0b915f 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -126,10 +126,8 @@ cache_env(Key) -> StatsFun :: fun((atom()) -> any()), Opts :: list(tuple()). start_link(Pool, Id, StatsFun, Opts) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []). - -name(Id) -> - list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). + gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, + ?MODULE, [Pool, Id, StatsFun, Opts], []). %% @doc Create Topic or Subscription. -spec create(topic, emqttd_topic:topic()) -> ok | {error, any()}; @@ -267,7 +265,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, insert_reverse_routes(SubPid, NewTopics), - StatsFun(route), + StatsFun(reverse_route), %% Insert topic records to mnesia Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics], @@ -298,6 +296,8 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St delete_reverse_routes(SubPid, Topics), + StatsFun(reverse_route), + %% Remove subscriptions if_subscription( fun(_) -> @@ -305,12 +305,13 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St emqttd_pooler:async_submit({mnesia, async_dirty, Args}), StatsFun(subscription) end), + {noreply, State}; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{statsfun = StatsFun}) -> Topics = reverse_routes(DownPid), @@ -318,6 +319,8 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> delete_reverse_routes(DownPid), + StatsFun(reverse_route), + {noreply, State, hibernate}; handle_info(Info, State) ->