diff --git a/Makefile b/Makefile index 368aff6a9..de4a2c8c0 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ + emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub CT_NODE_NAME = emqxct@127.0.0.1 diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index fecf98a7b..e597a233e 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -19,6 +19,9 @@ -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% internal export +-export([stats_fun/0]). + -define(HELPER, ?MODULE). -record(state, {}). @@ -32,7 +35,9 @@ start_link() -> %%------------------------------------------------------------------------------ init([]) -> - emqx_stats:update_interval(broker_stats, stats_fun()), + %% Use M:F/A for callback, not anonymous function because + %% fun M:F/A is small, also no badfun risk during hot beam reload + emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0), {ok, #state{}, hibernate}. handle_call(Req, _From, State) -> @@ -58,14 +63,12 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ stats_fun() -> - fun() -> - safe_update_stats(emqx_subscriber, - 'subscribers/count', 'subscribers/max'), - safe_update_stats(emqx_subscription, - 'subscriptions/count', 'subscriptions/max'), - safe_update_stats(emqx_suboptions, - 'suboptions/count', 'suboptions/max') - end. + safe_update_stats(emqx_subscriber, + 'subscribers/count', 'subscribers/max'), + safe_update_stats(emqx_subscription, + 'subscriptions/count', 'subscriptions/max'), + safe_update_stats(emqx_suboptions, + 'suboptions/count', 'suboptions/max'). safe_update_stats(Tab, Stat, MaxStat) -> case ets:info(Tab, size) of diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 510b2d91a..61ff6cbc3 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -18,7 +18,7 @@ -include("emqx.hrl"). --export([start_link/0]). +-export([start_link/0, start_link/1, stop/0]). %% Stats API. -export([getstats/0, getstat/1]). @@ -31,7 +31,8 @@ code_change/3]). -record(update, {name, countdown, interval, func}). --record(state, {timer, updates :: [#update{}]}). +-record(state, {timer, updates :: [#update{}], + tick_ms :: timeout()}). -type(stats() :: list({atom(), non_neg_integer()})). @@ -77,10 +78,20 @@ -define(TAB, ?MODULE). -define(SERVER, ?MODULE). +-type opts() :: #{tick_ms := timeout()}. + %% @doc Start stats server -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + start_link(#{tick_ms => timer:seconds(1)}). + +-spec(start_link(opts()) -> emqx_types:startlink_ret()). +start_link(Opts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, Opts, []). + +-spec(stop() -> ok). +stop() -> + gen_server:call(?SERVER, stop, infinity). %% @doc Generate stats fun -spec(statsfun(Stat :: atom()) -> fun()). @@ -140,16 +151,18 @@ cast(Msg) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init([]) -> +init(#{tick_ms := TickMs}) -> _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS, ?ROUTE_STATS, ?RETAINED_STATS]), true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]), - {ok, start_timer(#state{updates = []}), hibernate}. + {ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}. -start_timer(State) -> - State#state{timer = emqx_misc:start_timer(timer:seconds(1), tick)}. +start_timer(#state{tick_ms = Ms} = State) -> + State#state{timer = emqx_misc:start_timer(Ms, tick)}. +handle_call(stop, _From, State) -> + {stop, normal, _Reply = ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Stats] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -201,7 +214,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> - timer:cancel(TRef). + emqx_misc:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/test/emqx_stats_SUITE.erl b/test/emqx_stats_SUITE.erl deleted file mode 100644 index 5c7254468..000000000 --- a/test/emqx_stats_SUITE.erl +++ /dev/null @@ -1,56 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_stats_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("common_test/include/ct.hrl"). - -all() -> [t_set_get_state, t_update_interval]. - -t_set_get_state(_) -> - emqx_stats:start_link(), - SetConnsCount = emqx_stats:statsfun('connections/count'), - SetConnsCount(1), - 1 = emqx_stats:getstat('connections/count'), - emqx_stats:setstat('connections/count', 2), - 2 = emqx_stats:getstat('connections/count'), - emqx_stats:setstat('connections/count', 'connections/max', 3), - timer:sleep(100), - 3 = emqx_stats:getstat('connections/count'), - 3 = emqx_stats:getstat('connections/max'), - emqx_stats:setstat('connections/count', 'connections/max', 2), - timer:sleep(100), - 2 = emqx_stats:getstat('connections/count'), - 3 = emqx_stats:getstat('connections/max'), - SetConns = emqx_stats:statsfun('connections/count', 'connections/max'), - SetConns(4), - timer:sleep(100), - 4 = emqx_stats:getstat('connections/count'), - 4 = emqx_stats:getstat('connections/max'), - Conns = emqx_stats:getstats(), - 4 = proplists:get_value('connections/count', Conns), - 4 = proplists:get_value('connections/max', Conns). - -t_update_interval(_) -> - emqx_stats:start_link(), - emqx_stats:cancel_update(cm_stats), - ok = emqx_stats:update_interval(stats_test, fun update_stats/0), - timer:sleep(2500), - 1 = emqx_stats:getstat('connections/count'). - -update_stats() -> - emqx_stats:setstat('connections/count', 1). diff --git a/test/emqx_stats_tests.erl b/test/emqx_stats_tests.erl new file mode 100644 index 000000000..5c3a9c803 --- /dev/null +++ b/test/emqx_stats_tests.erl @@ -0,0 +1,89 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_stats_tests). + +-include_lib("eunit/include/eunit.hrl"). + +get_state_test() -> + with_proc(fun() -> + SetConnsCount = emqx_stats:statsfun('connections/count'), + SetConnsCount(1), + 1 = emqx_stats:getstat('connections/count'), + emqx_stats:setstat('connections/count', 2), + 2 = emqx_stats:getstat('connections/count'), + emqx_stats:setstat('connections/count', 'connections/max', 3), + timer:sleep(100), + 3 = emqx_stats:getstat('connections/count'), + 3 = emqx_stats:getstat('connections/max'), + emqx_stats:setstat('connections/count', 'connections/max', 2), + timer:sleep(100), + 2 = emqx_stats:getstat('connections/count'), + 3 = emqx_stats:getstat('connections/max'), + SetConns = emqx_stats:statsfun('connections/count', 'connections/max'), + SetConns(4), + timer:sleep(100), + 4 = emqx_stats:getstat('connections/count'), + 4 = emqx_stats:getstat('connections/max'), + Conns = emqx_stats:getstats(), + 4 = proplists:get_value('connections/count', Conns), + 4 = proplists:get_value('connections/max', Conns) + end). + +update_interval_test() -> + TickMs = 100, + with_proc(fun() -> + SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks + emqx_stats:cancel_update(cm_stats), + UpdFun = fun() -> emqx_stats:setstat('connections/count', 1) end, + ok = emqx_stats:update_interval(stats_test, UpdFun), + timer:sleep(SleepMs), + ?assertEqual(1, emqx_stats:getstat('connections/count')) + end, TickMs). + +emqx_broker_helpe_test() -> + TickMs = 100, + with_proc(fun() -> + SleepMs = TickMs + TickMs div 2, %% sleep for 1.5 ticks + Ref = make_ref(), + Tester = self(), + UpdFun = + fun() -> + emqx_broker_helper:stats_fun(), + Tester ! Ref, + ok + end, + ok = emqx_stats:update_interval(stats_test, UpdFun), + timer:sleep(SleepMs), + receive Ref -> ok after 2000 -> error(timeout) end + end, TickMs). + +with_proc(F) -> + {ok, _Pid} = emqx_stats:start_link(), + with_stop(F). + +with_proc(F, TickMs) -> + {ok, _Pid} = emqx_stats:start_link(#{tick_ms => TickMs}), + with_stop(F). + +with_stop(F) -> + try + %% make a synced call to the gen_server so we know it has + %% started running, hence it is safe to continue with less risk of race condition + ?assertEqual(ignored, gen_server:call(emqx_stats, ignored)), + F() + after + ok = emqx_stats:stop() + end. +