Merge pull request #6622 from lafirest/fix/retainer_stats
fix(emqx_retainer): fix the stats function of retainer
This commit is contained in:
commit
6a80f8320d
|
@ -146,7 +146,6 @@
|
||||||
{counter, 'messages.dropped.expired'}, % QoS2 Messages expired
|
{counter, 'messages.dropped.expired'}, % QoS2 Messages expired
|
||||||
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped
|
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped
|
||||||
{counter, 'messages.forward'}, % Messages forward
|
{counter, 'messages.forward'}, % Messages forward
|
||||||
{counter, 'messages.retained'}, % Messages retained
|
|
||||||
{counter, 'messages.delayed'}, % Messages delayed
|
{counter, 'messages.delayed'}, % Messages delayed
|
||||||
{counter, 'messages.delivered'}, % Messages delivered
|
{counter, 'messages.delivered'}, % Messages delivered
|
||||||
{counter, 'messages.acked'} % Messages acked
|
{counter, 'messages.acked'} % Messages acked
|
||||||
|
@ -207,7 +206,7 @@ stop() -> gen_server:stop(?SERVER).
|
||||||
|
|
||||||
%% BACKW: v4.3.0
|
%% BACKW: v4.3.0
|
||||||
upgrade_retained_delayed_counter_type() ->
|
upgrade_retained_delayed_counter_type() ->
|
||||||
Ks = ['messages.retained', 'messages.delayed'],
|
Ks = ['messages.delayed'],
|
||||||
gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
|
gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -556,7 +555,7 @@ reserved_idx('messages.dropped') -> 109;
|
||||||
reserved_idx('messages.dropped.expired') -> 110;
|
reserved_idx('messages.dropped.expired') -> 110;
|
||||||
reserved_idx('messages.dropped.no_subscribers') -> 111;
|
reserved_idx('messages.dropped.no_subscribers') -> 111;
|
||||||
reserved_idx('messages.forward') -> 112;
|
reserved_idx('messages.forward') -> 112;
|
||||||
reserved_idx('messages.retained') -> 113;
|
%%reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this
|
||||||
reserved_idx('messages.delayed') -> 114;
|
reserved_idx('messages.delayed') -> 114;
|
||||||
reserved_idx('messages.delivered') -> 115;
|
reserved_idx('messages.delivered') -> 115;
|
||||||
reserved_idx('messages.acked') -> 116;
|
reserved_idx('messages.acked') -> 116;
|
||||||
|
@ -592,4 +591,3 @@ reserved_idx('olp.gc') -> 303;
|
||||||
reserved_idx('olp.new_conn') -> 304;
|
reserved_idx('olp.new_conn') -> 304;
|
||||||
|
|
||||||
reserved_idx(_) -> undefined.
|
reserved_idx(_) -> undefined.
|
||||||
|
|
||||||
|
|
|
@ -71,19 +71,10 @@ t_inc_dec(_) ->
|
||||||
with_metrics_server(
|
with_metrics_server(
|
||||||
fun() ->
|
fun() ->
|
||||||
?assertEqual(0, emqx_metrics:val('bytes.received')),
|
?assertEqual(0, emqx_metrics:val('bytes.received')),
|
||||||
?assertEqual(0, emqx_metrics:val('messages.retained')),
|
|
||||||
ok = emqx_metrics:inc('bytes.received'),
|
ok = emqx_metrics:inc('bytes.received'),
|
||||||
ok = emqx_metrics:inc('bytes.received', 2),
|
ok = emqx_metrics:inc('bytes.received', 2),
|
||||||
ok = emqx_metrics:inc('bytes.received', 2),
|
ok = emqx_metrics:inc('bytes.received', 2),
|
||||||
?assertEqual(5, emqx_metrics:val('bytes.received')),
|
?assertEqual(5, emqx_metrics:val('bytes.received'))
|
||||||
ok = emqx_metrics:inc('messages.retained', 2),
|
|
||||||
ok = emqx_metrics:inc('messages.retained', 2),
|
|
||||||
?assertEqual(4, emqx_metrics:val('messages.retained')),
|
|
||||||
ok = emqx_metrics:dec('messages.retained'),
|
|
||||||
ok = emqx_metrics:dec('messages.retained', 1),
|
|
||||||
?assertEqual(2, emqx_metrics:val('messages.retained')),
|
|
||||||
ok = emqx_metrics:set('messages.retained', 3),
|
|
||||||
?assertEqual(3, emqx_metrics:val('messages.retained'))
|
|
||||||
end).
|
end).
|
||||||
|
|
||||||
t_inc_recv(_) ->
|
t_inc_recv(_) ->
|
||||||
|
@ -162,21 +153,12 @@ t_trans(_) ->
|
||||||
ok = emqx_metrics:trans(inc, 'bytes.received'),
|
ok = emqx_metrics:trans(inc, 'bytes.received'),
|
||||||
ok = emqx_metrics:trans(inc, 'bytes.received', 2),
|
ok = emqx_metrics:trans(inc, 'bytes.received', 2),
|
||||||
?assertEqual(0, emqx_metrics:val('bytes.received')),
|
?assertEqual(0, emqx_metrics:val('bytes.received')),
|
||||||
ok = emqx_metrics:trans(inc, 'messages.retained', 2),
|
|
||||||
ok = emqx_metrics:trans(inc, 'messages.retained', 2),
|
|
||||||
?assertEqual(0, emqx_metrics:val('messages.retained')),
|
|
||||||
ok = emqx_metrics:commit(),
|
ok = emqx_metrics:commit(),
|
||||||
?assertEqual(3, emqx_metrics:val('bytes.received')),
|
?assertEqual(3, emqx_metrics:val('bytes.received')),
|
||||||
?assertEqual(4, emqx_metrics:val('messages.retained')),
|
ok = emqx_metrics:commit()
|
||||||
ok = emqx_metrics:trans(dec, 'messages.retained'),
|
|
||||||
ok = emqx_metrics:trans(dec, 'messages.retained', 1),
|
|
||||||
?assertEqual(4, emqx_metrics:val('messages.retained')),
|
|
||||||
ok = emqx_metrics:commit(),
|
|
||||||
?assertEqual(2, emqx_metrics:val('messages.retained'))
|
|
||||||
end).
|
end).
|
||||||
|
|
||||||
with_metrics_server(Fun) ->
|
with_metrics_server(Fun) ->
|
||||||
{ok, _} = emqx_metrics:start_link(),
|
{ok, _} = emqx_metrics:start_link(),
|
||||||
_ = Fun(),
|
_ = Fun(),
|
||||||
ok = emqx_metrics:stop().
|
ok = emqx_metrics:stop().
|
||||||
|
|
||||||
|
|
|
@ -288,7 +288,7 @@ systopic_metrics() ->
|
||||||
<<"messages/qos2/received">>, <<"messages/qos2/sent">>,
|
<<"messages/qos2/received">>, <<"messages/qos2/sent">>,
|
||||||
<<"messages/publish">>, <<"messages/dropped">>,
|
<<"messages/publish">>, <<"messages/dropped">>,
|
||||||
<<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>,
|
<<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>,
|
||||||
<<"messages/forward">>, <<"messages/retained">>,
|
<<"messages/forward">>,
|
||||||
<<"messages/delayed">>, <<"messages/delivered">>,
|
<<"messages/delayed">>, <<"messages/delivered">>,
|
||||||
<<"messages/acked">>],
|
<<"messages/acked">>],
|
||||||
?LET({Nodename, T},
|
?LET({Nodename, T},
|
||||||
|
|
|
@ -37,7 +37,9 @@
|
||||||
, clean/0
|
, clean/0
|
||||||
, delete/1
|
, delete/1
|
||||||
, page_read/3
|
, page_read/3
|
||||||
, post_config_update/5]).
|
, post_config_update/5
|
||||||
|
, stats_fun/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
|
@ -69,6 +71,7 @@
|
||||||
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
|
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
|
||||||
-callback clear_expired(context()) -> ok.
|
-callback clear_expired(context()) -> ok.
|
||||||
-callback clean(context()) -> ok.
|
-callback clean(context()) -> ok.
|
||||||
|
-callback size(context()) -> non_neg_integer().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Hook API
|
%% Hook API
|
||||||
|
@ -185,6 +188,9 @@ post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) ->
|
||||||
call(Req) ->
|
call(Req) ->
|
||||||
gen_server:call(?MODULE, Req, infinity).
|
gen_server:call(?MODULE, Req, infinity).
|
||||||
|
|
||||||
|
stats_fun() ->
|
||||||
|
gen_server:cast(?MODULE, ?FUNCTION_NAME).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -226,6 +232,12 @@ handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast(stats_fun, #{context := Context} = State) ->
|
||||||
|
Mod = get_backend_module(),
|
||||||
|
Size = Mod:size(Context),
|
||||||
|
emqx_stats:setstat('retained.count', 'retained.max', Size),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -485,8 +497,11 @@ close_resource(_) ->
|
||||||
load(Context) ->
|
load(Context) ->
|
||||||
_ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}),
|
_ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}),
|
||||||
_ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}),
|
_ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}),
|
||||||
|
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
|
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
|
||||||
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}).
|
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
|
||||||
|
emqx_stats:cancel_update(emqx_retainer_stats),
|
||||||
|
ok.
|
||||||
|
|
|
@ -30,7 +30,9 @@
|
||||||
, page_read/4
|
, page_read/4
|
||||||
, match_messages/3
|
, match_messages/3
|
||||||
, clear_expired/1
|
, clear_expired/1
|
||||||
, clean/1]).
|
, clean/1
|
||||||
|
, size/1
|
||||||
|
]).
|
||||||
|
|
||||||
-export([create_resource/1]).
|
-export([create_resource/1]).
|
||||||
|
|
||||||
|
@ -73,7 +75,6 @@ store_retained(_, Msg =#message{topic = Topic}) ->
|
||||||
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
||||||
case is_table_full() of
|
case is_table_full() of
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_metrics:inc('messages.retained'),
|
|
||||||
mria:dirty_write(?TAB,
|
mria:dirty_write(?TAB,
|
||||||
#retained{topic = topic2tokens(Topic),
|
#retained{topic = topic2tokens(Topic),
|
||||||
msg = Msg,
|
msg = Msg,
|
||||||
|
@ -156,6 +157,10 @@ match_messages(_, Topic, Cursor) ->
|
||||||
clean(_) ->
|
clean(_) ->
|
||||||
_ = mria:clear_table(?TAB),
|
_ = mria:clear_table(?TAB),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
size(_) ->
|
||||||
|
table_size().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue