diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 2757ea492..2c8088c48 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -146,7 +146,6 @@ {counter, 'messages.dropped.expired'}, % QoS2 Messages expired {counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.forward'}, % Messages forward - {counter, 'messages.retained'}, % Messages retained {counter, 'messages.delayed'}, % Messages delayed {counter, 'messages.delivered'}, % Messages delivered {counter, 'messages.acked'} % Messages acked @@ -207,7 +206,7 @@ stop() -> gen_server:stop(?SERVER). %% BACKW: v4.3.0 upgrade_retained_delayed_counter_type() -> - Ks = ['messages.retained', 'messages.delayed'], + Ks = ['messages.delayed'], gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity). %%-------------------------------------------------------------------- @@ -556,7 +555,7 @@ reserved_idx('messages.dropped') -> 109; reserved_idx('messages.dropped.expired') -> 110; reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.forward') -> 112; -reserved_idx('messages.retained') -> 113; +%%reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this reserved_idx('messages.delayed') -> 114; reserved_idx('messages.delivered') -> 115; reserved_idx('messages.acked') -> 116; @@ -592,4 +591,3 @@ reserved_idx('olp.gc') -> 303; reserved_idx('olp.new_conn') -> 304; reserved_idx(_) -> undefined. - diff --git a/apps/emqx/test/emqx_metrics_SUITE.erl b/apps/emqx/test/emqx_metrics_SUITE.erl index f47e028bc..8b277e898 100644 --- a/apps/emqx/test/emqx_metrics_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_SUITE.erl @@ -71,19 +71,10 @@ t_inc_dec(_) -> with_metrics_server( fun() -> ?assertEqual(0, emqx_metrics:val('bytes.received')), - ?assertEqual(0, emqx_metrics:val('messages.retained')), ok = emqx_metrics:inc('bytes.received'), ok = emqx_metrics:inc('bytes.received', 2), ok = emqx_metrics:inc('bytes.received', 2), - ?assertEqual(5, emqx_metrics:val('bytes.received')), - ok = emqx_metrics:inc('messages.retained', 2), - ok = emqx_metrics:inc('messages.retained', 2), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:dec('messages.retained'), - ok = emqx_metrics:dec('messages.retained', 1), - ?assertEqual(2, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:set('messages.retained', 3), - ?assertEqual(3, emqx_metrics:val('messages.retained')) + ?assertEqual(5, emqx_metrics:val('bytes.received')) end). t_inc_recv(_) -> @@ -162,21 +153,12 @@ t_trans(_) -> ok = emqx_metrics:trans(inc, 'bytes.received'), ok = emqx_metrics:trans(inc, 'bytes.received', 2), ?assertEqual(0, emqx_metrics:val('bytes.received')), - ok = emqx_metrics:trans(inc, 'messages.retained', 2), - ok = emqx_metrics:trans(inc, 'messages.retained', 2), - ?assertEqual(0, emqx_metrics:val('messages.retained')), ok = emqx_metrics:commit(), ?assertEqual(3, emqx_metrics:val('bytes.received')), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:trans(dec, 'messages.retained'), - ok = emqx_metrics:trans(dec, 'messages.retained', 1), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:commit(), - ?assertEqual(2, emqx_metrics:val('messages.retained')) + ok = emqx_metrics:commit() end). with_metrics_server(Fun) -> {ok, _} = emqx_metrics:start_link(), _ = Fun(), ok = emqx_metrics:stop(). - diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 32aba9674..fcae60294 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -288,7 +288,7 @@ systopic_metrics() -> <<"messages/qos2/received">>, <<"messages/qos2/sent">>, <<"messages/publish">>, <<"messages/dropped">>, <<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>, - <<"messages/forward">>, <<"messages/retained">>, + <<"messages/forward">>, <<"messages/delayed">>, <<"messages/delivered">>, <<"messages/acked">>], ?LET({Nodename, T}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index e1780cc08..51a28897c 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -37,7 +37,9 @@ , clean/0 , delete/1 , page_read/3 - , post_config_update/5]). + , post_config_update/5 + , stats_fun/0 + ]). %% gen_server callbacks -export([ init/1 @@ -69,6 +71,7 @@ -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. +-callback size(context()) -> non_neg_integer(). %%-------------------------------------------------------------------- %% Hook API @@ -185,6 +188,9 @@ post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> call(Req) -> gen_server:call(?MODULE, Req, infinity). +stats_fun() -> + gen_server:cast(?MODULE, ?FUNCTION_NAME). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -226,6 +232,12 @@ handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast(stats_fun, #{context := Context} = State) -> + Mod = get_backend_module(), + Size = Mod:size(Context), + emqx_stats:setstat('retained.count', 'retained.max', Size), + {noreply, State}; + handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -485,8 +497,11 @@ close_resource(_) -> load(Context) -> _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}), _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}), + emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), ok. unload() -> emqx:unhook('message.publish', {?MODULE, on_message_publish}), - emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). + emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}), + emqx_stats:cancel_update(emqx_retainer_stats), + ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 89db7108a..cc2cd8f44 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -30,7 +30,9 @@ , page_read/4 , match_messages/3 , clear_expired/1 - , clean/1]). + , clean/1 + , size/1 + ]). -export([create_resource/1]). @@ -73,7 +75,6 @@ store_retained(_, Msg =#message{topic = Topic}) -> ExpiryTime = emqx_retainer:get_expiry_time(Msg), case is_table_full() of false -> - ok = emqx_metrics:inc('messages.retained'), mria:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), msg = Msg, @@ -156,6 +157,10 @@ match_messages(_, Topic, Cursor) -> clean(_) -> _ = mria:clear_table(?TAB), ok. + +size(_) -> + table_size(). + %%-------------------------------------------------------------------- %% Internal functions %%--------------------------------------------------------------------