setstats after route table changed
This commit is contained in:
parent
43165a5e8a
commit
5bfd3a784e
|
@ -55,12 +55,10 @@
|
|||
-type topic() :: binary().
|
||||
|
||||
%% @doc Start a local router.
|
||||
-spec start_link(atom(), pos_integer(), fun(), list()) -> {ok, pid()} | {error, any}.
|
||||
-spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}.
|
||||
start_link(Pool, Id, StatsFun, Env) ->
|
||||
gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Env], []).
|
||||
|
||||
name(Id) ->
|
||||
list_to_atom("emqttd_router_" ++ integer_to_list(Id)).
|
||||
gen_server2:start_link({local, emqttd:reg_name(?MODULE,Id)},
|
||||
?MODULE, [Pool, Id, StatsFun, Env], []).
|
||||
|
||||
%% @doc Route Message on the local node.
|
||||
-spec route(topic(), mqtt_message()) -> any().
|
||||
|
@ -180,23 +178,23 @@ start_tick(Secs) ->
|
|||
|
||||
handle_call({add_route, Topic, Pid}, _From, State) ->
|
||||
ets:insert(route, {Topic, Pid}),
|
||||
{reply, ok, State};
|
||||
{reply, ok, setstats(State)};
|
||||
|
||||
handle_call({add_routes, Topics, Pid}, _From, State) ->
|
||||
ets:insert(route, [{Topic, Pid} || Topic <- Topics]),
|
||||
{reply, ok, State};
|
||||
{reply, ok, setstats(State)};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?UNEXPECTED_REQ(Req, State).
|
||||
|
||||
handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) ->
|
||||
ets:delete_object(route, {Topic, Pid}),
|
||||
NewState =
|
||||
case has_route(Topic) of
|
||||
false ->
|
||||
{noreply, State#state{aging = store_aged(Topic, Aging)}};
|
||||
true ->
|
||||
{noreply, State}
|
||||
end;
|
||||
false -> State#state{aging = store_aged(Topic, Aging)};
|
||||
true -> State
|
||||
end,
|
||||
{noreply, setstats(NewState)};
|
||||
|
||||
handle_cast({delete_routes, Topics, Pid}, State) ->
|
||||
NewAging =
|
||||
|
@ -207,7 +205,7 @@ handle_cast({delete_routes, Topics, Pid}, State) ->
|
|||
true -> Aging
|
||||
end
|
||||
end, State#state.aging, Topics),
|
||||
{noreply, State#state{aging = NewAging}};
|
||||
{noreply, setstats(State#state{aging = NewAging})};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?UNEXPECTED_MSG(Msg, State).
|
||||
|
@ -280,3 +278,6 @@ store_aged(Topic, Aging = #aging{topics = Dict}) ->
|
|||
Now = emqttd_util:now_to_secs(),
|
||||
Aging#aging{topics = dict:store(Topic, Now, Dict)}.
|
||||
|
||||
setstats(State = #state{statsfun = StatsFun}) ->
|
||||
StatsFun(route), State.
|
||||
|
||||
|
|
Loading…
Reference in New Issue