diff --git a/etc/emqx.conf b/etc/emqx.conf index 7876be59b..d23ee2d5f 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -479,6 +479,15 @@ mqtt.shared_subscription = true ## Value: true | false mqtt.ignore_loop_deliver = false +##-------------------------------------------------------------------- +## Metric +##-------------------------------------------------------------------- + +## Commit interval for metric +## +## Value: Duration +metric.commit_interval = 10s + ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 1001ab5a8..70ded106c 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -616,6 +616,15 @@ end}. {datatype, {enum, [true, false]}} ]}. +%%-------------------------------------------------------------------- +%% Metirc +%%-------------------------------------------------------------------- +%% @doc Commit interval for metric +{mapping, "metric.commit_interval", "emqx.metric_commit_interval", [ + {default, "10s"}, + {datatype, {duration, ms}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index ff53554d4..d684c6273 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -321,14 +321,14 @@ topics() -> emqx_router:topics(). init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), + MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000), + emqx_metrics:start_timer(MetricCommitInterval, MetricCommitInterval div 2, {metric_commit, MetricCommitInterval}), {ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. - - handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> Subscriber = {SubPid, SubId}, case ets:member(?SUBOPTION, {Topic, Subscriber}) of @@ -373,6 +373,11 @@ handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{submap = Su {noreply, State} end; +handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) -> + emqx_metrics:commit(), + emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}), + {noreply, State}; + handle_info(Info, State) -> emqx_logger:error("[Broker] unexpected info: ~p", [Info]), {noreply, State}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 609ce25d6..3516ec8bd 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -154,6 +154,10 @@ init([Transport, RawSocket, Options]) -> ok = emqx_misc:init_proc_mng_policy(Zone), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), + MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000), + emqx_metrics:start_timer(MetricCommitInterval, + MetricCommitInterval div 2, + {metric_commit, MetricCommitInterval}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -228,6 +232,10 @@ handle_info({timeout, Timer, emit_stats}, ?LOG(warning, "shutdown due to ~p", [Reason]), shutdown(Reason, NewState) end; +handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) -> + emqx_metrics:commit(), + emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}), + {noreply, State}; handle_info(timeout, State) -> shutdown(idle_timeout, State); diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 8130a307e..443af5a4b 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -18,7 +18,8 @@ -export([start_link/0]). -export([new/1, all/0]). --export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). +-export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2, commit/0]). +-export([start_timer/2, start_timer/3]). %% Received/sent metrics -export([received/1, sent/1]). @@ -133,10 +134,8 @@ inc(Metric, Val) when is_atom(Metric) -> %% @doc Increase metric value -spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()). -inc(gauge, Metric, Val) -> - update_counter(key(gauge, Metric), {2, Val}); -inc(counter, Metric, Val) -> - update_counter(key(counter, Metric), {2, Val}). +inc(Type, Metric, Val) -> + hold(Type, Metric, Val). %% @doc Decrease metric value -spec(dec(gauge, atom()) -> integer()). @@ -146,7 +145,7 @@ dec(gauge, Metric) -> %% @doc Decrease metric value -spec(dec(gauge, atom(), pos_integer()) -> integer()). dec(gauge, Metric, Val) -> - update_counter(key(gauge, Metric), {2, -Val}). + hold(gauge, Metric, -Val). %% @doc Set metric value set(Metric, Val) when is_atom(Metric) -> @@ -154,6 +153,44 @@ set(Metric, Val) when is_atom(Metric) -> set(gauge, Metric, Val) -> ets:insert(?TAB, {key(gauge, Metric), Val}). +% -spec(hold(counter | gauge, atom(), inc_dec | assign, integer()) -> integer()). +hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge -> + NewMetrics = case get(metrics) of + undefined -> + #{Metric => {Type, Val}}; + Metrics -> + {Type, Count} = maps:get(Metric, Metrics, {Type, 0}), + Metrics#{Metric => {Type, Count + Val}} + end, + put(metrics, NewMetrics). + +commit() -> + case get(metrics) of + undefined -> + ok; + Metrics -> + maps:fold(fun(Metric, {Type, Val}, Acc) -> + update_counter(key(Type, Metric), {2, Val}), + Acc + end, 0, Metrics), + put(metrics, #{}) + end. + +-spec(start_timer(integer(), term()) -> reference() | undefined). +start_timer(Interval, Msg) -> + start_timer(Interval, 0, Msg). + +-spec(start_timer(integer(), integer(), term()) -> reference() | undefined). +start_timer(Interval, MaxJitter, Msg) when Interval > 0 -> + emqx_misc:start_timer((Interval + case MaxJitter >= 1 of + true -> + rand:uniform(MaxJitter); + false -> + 0 + end), Msg); +start_timer(_Interval, _Jitter, _Msg) -> + undefined. + %% @doc Metric key key(gauge, Metric) -> {Metric, 0}; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b223dfdda..8ae810bfe 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -377,6 +377,10 @@ init([Parent, #{zone := Zone, ok = emqx_gc:init(GcPolicy), ok = emqx_misc:init_proc_mng_policy(Zone), ok = proc_lib:init_ack(Parent, {ok, self()}), + MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000), + emqx_metrics:start_timer(MetricCommitInterval, + MetricCommitInterval div 2, + {metric_commit, MetricCommitInterval}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). init_mqueue(Zone) -> @@ -624,6 +628,11 @@ handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, wil send_willmsg(WillMsg), {noreply, State#state{will_msg = undefined}}; +handle_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) -> + emqx_metrics:commit(), + emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}), + {noreply, State}; + handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) -> send_willmsg(WillMsg), {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 77d2c058f..ae03f61dc 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -137,6 +137,10 @@ websocket_init(#state{request = Req, options = Options}) -> lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), + MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000), + emqx_metrics:start_timer(MetricCommitInterval, + MetricCommitInterval div 2, + {metric_commit, MetricCommitInterval}), {ok, #state{peername = Peername, sockname = Sockname, parser_state = ParserState, @@ -226,6 +230,11 @@ websocket_info({timeout, Timer, emit_stats}, emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {ok, State#state{stats_timer = undefined}, hibernate}; +websocket_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) -> + emqx_metrics:commit(), + emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}), + {ok, State, hibernate}; + websocket_info({keepalive, start, Interval}, State) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval]), case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of diff --git a/test/emqx_metrics_SUITE.erl b/test/emqx_metrics_SUITE.erl index 8e601562c..b6f2d42a1 100644 --- a/test/emqx_metrics_SUITE.erl +++ b/test/emqx_metrics_SUITE.erl @@ -29,11 +29,15 @@ t_inc_dec_metrics(_) -> emqx_metrics:inc(counter, 'bytes/received', 2), emqx_metrics:inc({gauge, 'messages/retained'}, 2), emqx_metrics:inc(gauge, 'messages/retained', 2), + emqx_metrics:commit(), {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, emqx_metrics:dec(gauge, 'messages/retained'), emqx_metrics:dec(gauge, 'messages/retained', 1), + emqx_metrics:commit(), 2 = emqx_metrics:val('messages/retained'), emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}), + emqx_metrics:commit(), {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')}, emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}), + emqx_metrics:commit(), {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 2bae869b6..d1a7357b1 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -45,6 +45,7 @@ ignore_loop(_Config) -> application:set_env(emqx, mqtt_ignore_loop_deliver, false). t_session_all(_) -> + application:set_env(emqx, metric_commit_interval, 10), ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),