fix reverse_route stats
This commit is contained in:
parent
51c5ea1f96
commit
284abc7388
|
@ -126,10 +126,8 @@ cache_env(Key) ->
|
||||||
StatsFun :: fun((atom()) -> any()),
|
StatsFun :: fun((atom()) -> any()),
|
||||||
Opts :: list(tuple()).
|
Opts :: list(tuple()).
|
||||||
start_link(Pool, Id, StatsFun, Opts) ->
|
start_link(Pool, Id, StatsFun, Opts) ->
|
||||||
gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []).
|
gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)},
|
||||||
|
?MODULE, [Pool, Id, StatsFun, Opts], []).
|
||||||
name(Id) ->
|
|
||||||
list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)).
|
|
||||||
|
|
||||||
%% @doc Create Topic or Subscription.
|
%% @doc Create Topic or Subscription.
|
||||||
-spec create(topic, emqttd_topic:topic()) -> ok | {error, any()};
|
-spec create(topic, emqttd_topic:topic()) -> ok | {error, any()};
|
||||||
|
@ -267,7 +265,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
|
||||||
|
|
||||||
insert_reverse_routes(SubPid, NewTopics),
|
insert_reverse_routes(SubPid, NewTopics),
|
||||||
|
|
||||||
StatsFun(route),
|
StatsFun(reverse_route),
|
||||||
|
|
||||||
%% Insert topic records to mnesia
|
%% Insert topic records to mnesia
|
||||||
Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics],
|
Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics],
|
||||||
|
@ -298,6 +296,8 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St
|
||||||
|
|
||||||
delete_reverse_routes(SubPid, Topics),
|
delete_reverse_routes(SubPid, Topics),
|
||||||
|
|
||||||
|
StatsFun(reverse_route),
|
||||||
|
|
||||||
%% Remove subscriptions
|
%% Remove subscriptions
|
||||||
if_subscription(
|
if_subscription(
|
||||||
fun(_) ->
|
fun(_) ->
|
||||||
|
@ -305,12 +305,13 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St
|
||||||
emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
|
emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
|
||||||
StatsFun(subscription)
|
StatsFun(subscription)
|
||||||
end),
|
end),
|
||||||
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{statsfun = StatsFun}) ->
|
||||||
|
|
||||||
Topics = reverse_routes(DownPid),
|
Topics = reverse_routes(DownPid),
|
||||||
|
|
||||||
|
@ -318,6 +319,8 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
||||||
|
|
||||||
delete_reverse_routes(DownPid),
|
delete_reverse_routes(DownPid),
|
||||||
|
|
||||||
|
StatsFun(reverse_route),
|
||||||
|
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
Loading…
Reference in New Issue