Change more stats callbacks to full M:F/A
Including emqx_sm emqx_cm emqx_router_helper
This commit is contained in:
parent
4082f3ade2
commit
7e7d99fbad
|
@ -30,6 +30,9 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
|
%% internal export
|
||||||
|
-export([update_conn_stats/0]).
|
||||||
|
|
||||||
-define(CM, ?MODULE).
|
-define(CM, ?MODULE).
|
||||||
|
|
||||||
%% ETS Tables.
|
%% ETS Tables.
|
||||||
|
@ -125,7 +128,7 @@ init([]) ->
|
||||||
_ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
_ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
||||||
_ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
_ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
_ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
||||||
ok = emqx_stats:update_interval(cm_stats, fun update_conn_stats/0),
|
ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0),
|
||||||
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
|
|
@ -31,6 +31,9 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
|
%% internal export
|
||||||
|
-export([stats_fun/0]).
|
||||||
|
|
||||||
-record(routing_node, {name, const = unused}).
|
-record(routing_node, {name, const = unused}).
|
||||||
-record(state, {nodes = []}).
|
-record(state, {nodes = []}).
|
||||||
|
|
||||||
|
@ -90,7 +93,7 @@ init([]) ->
|
||||||
[Node | Acc]
|
[Node | Acc]
|
||||||
end
|
end
|
||||||
end, [], mnesia:dirty_all_keys(?ROUTING_NODE)),
|
end, [], mnesia:dirty_all_keys(?ROUTING_NODE)),
|
||||||
emqx_stats:update_interval(route_stats, stats_fun()),
|
emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
||||||
{ok, #state{nodes = Nodes}, hibernate}.
|
{ok, #state{nodes = Nodes}, hibernate}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -143,13 +146,11 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
fun() ->
|
case ets:info(?ROUTE, size) of
|
||||||
case ets:info(?ROUTE, size) of
|
undefined -> ok;
|
||||||
undefined -> ok;
|
Size ->
|
||||||
Size ->
|
emqx_stats:setstat('routes/count', 'routes/max', Size),
|
||||||
emqx_stats:setstat('routes/count', 'routes/max', Size),
|
emqx_stats:setstat('topics/count', 'topics/max', Size)
|
||||||
emqx_stats:setstat('topics/count', 'topics/max', Size)
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cleanup_routes(Node) ->
|
cleanup_routes(Node) ->
|
||||||
|
|
|
@ -31,6 +31,9 @@
|
||||||
%% Internal functions for rpc
|
%% Internal functions for rpc
|
||||||
-export([dispatch/3]).
|
-export([dispatch/3]).
|
||||||
|
|
||||||
|
%% Internal function for stats
|
||||||
|
-export([stats_fun/0]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
@ -210,7 +213,7 @@ init([]) ->
|
||||||
_ = emqx_tables:new(?SESSION_P_TAB, TabOpts),
|
_ = emqx_tables:new(?SESSION_P_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
_ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
_ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
||||||
emqx_stats:update_interval(sm_stats, stats_fun()),
|
emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
|
||||||
{ok, #{session_pmon => emqx_pmon:new()}}.
|
{ok, #{session_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -251,10 +254,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
fun() ->
|
safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
|
||||||
safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
|
safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max').
|
||||||
safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max')
|
|
||||||
end.
|
|
||||||
|
|
||||||
safe_update_stats(Tab, Stat, MaxStat) ->
|
safe_update_stats(Tab, Stat, MaxStat) ->
|
||||||
case ets:info(Tab, size) of
|
case ets:info(Tab, size) of
|
||||||
|
|
|
@ -42,7 +42,7 @@ get_state_test() ->
|
||||||
end).
|
end).
|
||||||
|
|
||||||
update_interval_test() ->
|
update_interval_test() ->
|
||||||
TickMs = 100,
|
TickMs = 200,
|
||||||
with_proc(fun() ->
|
with_proc(fun() ->
|
||||||
SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks
|
SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks
|
||||||
emqx_stats:cancel_update(cm_stats),
|
emqx_stats:cancel_update(cm_stats),
|
||||||
|
@ -52,22 +52,34 @@ update_interval_test() ->
|
||||||
?assertEqual(1, emqx_stats:getstat('connections/count'))
|
?assertEqual(1, emqx_stats:getstat('connections/count'))
|
||||||
end, TickMs).
|
end, TickMs).
|
||||||
|
|
||||||
emqx_broker_helpe_test() ->
|
helper_test_() ->
|
||||||
TickMs = 100,
|
TickMs = 200,
|
||||||
with_proc(fun() ->
|
TestF =
|
||||||
SleepMs = TickMs + TickMs div 2, %% sleep for 1.5 ticks
|
fun(CbModule, CbFun) ->
|
||||||
Ref = make_ref(),
|
SleepMs = TickMs + TickMs div 2, %% sleep for 1.5 ticks
|
||||||
Tester = self(),
|
Ref = make_ref(),
|
||||||
UpdFun =
|
Tester = self(),
|
||||||
fun() ->
|
UpdFun =
|
||||||
emqx_broker_helper:stats_fun(),
|
fun() ->
|
||||||
Tester ! Ref,
|
CbModule:CbFun(),
|
||||||
ok
|
Tester ! Ref,
|
||||||
end,
|
ok
|
||||||
ok = emqx_stats:update_interval(stats_test, UpdFun),
|
end,
|
||||||
timer:sleep(SleepMs),
|
ok = emqx_stats:update_interval(stats_test, UpdFun),
|
||||||
receive Ref -> ok after 2000 -> error(timeout) end
|
timer:sleep(SleepMs),
|
||||||
end, TickMs).
|
receive Ref -> ok after 2000 -> error(timeout) end
|
||||||
|
end,
|
||||||
|
MkTestFun =
|
||||||
|
fun(CbModule, CbFun) ->
|
||||||
|
fun() ->
|
||||||
|
with_proc(fun() -> TestF(CbModule, CbFun) end, TickMs)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
[{"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)},
|
||||||
|
{"emqx_sm", MkTestFun(emqx_sm, stats_fun)},
|
||||||
|
{"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)},
|
||||||
|
{"emqx_cm", MkTestFun(emqx_cm, update_conn_stats)}
|
||||||
|
].
|
||||||
|
|
||||||
with_proc(F) ->
|
with_proc(F) ->
|
||||||
{ok, _Pid} = emqx_stats:start_link(),
|
{ok, _Pid} = emqx_stats:start_link(),
|
||||||
|
|
Loading…
Reference in New Issue