From be7de756c6bd5c39a6fc5b2a4c7914d113e06f46 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Tue, 28 May 2019 16:57:27 +0800 Subject: [PATCH 1/9] Stop alarm before emqx stops --- src/emqx_app.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 47075922b..8e8ce4c1b 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -45,6 +45,7 @@ start(_Type, _Args) -> -spec(stop(State :: term()) -> term()). stop(_State) -> + emqx_alarm_handler:unload(), emqx_listeners:stop(), emqx_modules:unload(). From e29cde60a9cf1e89d09f9dc47866d39cd0c483d4 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 30 May 2019 18:58:37 +0800 Subject: [PATCH 2/9] Update ekka to v0.5.5 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index fdc7954d4..1f82fef6e 100644 --- a/rebar.config +++ b/rebar.config @@ -3,7 +3,7 @@ , {cowboy, "2.6.1"} % hex , {gproc, "0.8.0"} % hex , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.4"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "v5.4.4"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} From 97152346269eca14180e550c6dfdc13045dae4c1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 May 2019 14:29:09 +0800 Subject: [PATCH 3/9] Optimize the emqx_zone module using persistent_term - Don't reload the zone options for updating persistent_term is expensive - Use '{?MODULE, Zone, Key}' as the key to avoid name collision --- src/emqx_zone.erl | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 225d0ec4a..65deeea24 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -41,8 +41,12 @@ , code_change/3 ]). +%% dummy state +-record(state, {}). + -define(TAB, ?MODULE). -define(SERVER, ?MODULE). +-define(KEY(Zone, Key), {?MODULE, Zone, Key}). %%------------------------------------------------------------------------------ %% APIs @@ -62,7 +66,7 @@ get_env(Zone, Key) -> get_env(undefined, Key, Def) -> emqx_config:get_env(Key, Def); get_env(Zone, Key, Def) -> - try persistent_term:get({Zone, Key}) + try persistent_term:get(?KEY(Zone, Key)) catch error:badarg -> emqx_config:get_env(Key, Def) end. @@ -84,7 +88,8 @@ stop() -> %%------------------------------------------------------------------------------ init([]) -> - {ok, element(2, handle_info(reload, #{timer => undefined}))}. + _ = do_reload(), + {ok, #state{}}. handle_call(force_reload, _From, State) -> _ = do_reload(), @@ -95,17 +100,13 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({set_env, Zone, Key, Val}, State) -> - persistent_term:put({Zone, Key}, Val), + ok = persistent_term:put(?KEY(Zone, Key), Val), {noreply, State}; handle_cast(Msg, State) -> ?LOG(error, "[Zone] Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(reload, State) -> - _ = do_reload(), - {noreply, ensure_reload_timer(State#{timer := undefined}), hibernate}; - handle_info(Info, State) -> ?LOG(error, "[Zone] Unexpected info: ~p", [Info]), {noreply, State}. @@ -121,11 +122,6 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ do_reload() -> - [[persistent_term:put({Zone, Key}, Val) - || {Key, Val} <- Opts] - || {Zone, Opts} <- emqx_config:get_env(zones, [])]. + [ persistent_term:put(?KEY(Zone, Key), Val) + || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts ]. -ensure_reload_timer(State = #{timer := undefined}) -> - State#{timer := erlang:send_after(timer:minutes(5), self(), reload)}; -ensure_reload_timer(State) -> - State. From 365832f9456b3b1bff0dbb951c5669d902471396 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 May 2019 02:07:21 +0800 Subject: [PATCH 4/9] Use new ranch api to elimate warning log for deprecated apis --- src/emqx_listeners.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 104a9583a..048c019b8 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -89,16 +89,18 @@ ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), TcpOptions = proplists:get_value(tcp_options, Options, []), - RanchOpts = [{num_acceptors, NumAcceptors}, {max_connections, MaxConnections} | TcpOptions], + RanchOpts = #{ num_acceptors => NumAcceptors + , max_connections => MaxConnections + , socket_opts => TcpOptions}, case proplists:get_value(ssl_options, Options) of undefined -> RanchOpts; - SslOptions -> RanchOpts ++ SslOptions + SslOptions -> RanchOpts#{socket_opts => TcpOptions ++ SslOptions} end. -with_port(Port, Opts) when is_integer(Port) -> - [{port, Port}|Opts]; -with_port({Addr, Port}, Opts) -> - [{ip, Addr}, {port, Port}|Opts]. +with_port(Port, Opts = #{socket_opts := SocketOption}) when is_integer(Port) -> + Opts#{socket_opts => [{port, Port}| SocketOption]}; +with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) -> + Opts#{socket_opts => [{ip, Addr}, {port, Port}| SocketOption]}. %% @doc Restart all listeners -spec(restart() -> ok). From 47f9d0e90f130c551550a0db24556d8318f92a20 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 May 2019 09:30:27 +0800 Subject: [PATCH 5/9] Adjust format --- src/emqx_listeners.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 048c019b8..65515a8d3 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -89,9 +89,9 @@ ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), TcpOptions = proplists:get_value(tcp_options, Options, []), - RanchOpts = #{ num_acceptors => NumAcceptors - , max_connections => MaxConnections - , socket_opts => TcpOptions}, + RanchOpts = #{num_acceptors => NumAcceptors, + max_connections => MaxConnections, + socket_opts => TcpOptions}, case proplists:get_value(ssl_options, Options) of undefined -> RanchOpts; SslOptions -> RanchOpts#{socket_opts => TcpOptions ++ SslOptions} From e7cd32edb7f6ad72696c9c378564363a9fb4247a Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 May 2019 11:17:28 +0800 Subject: [PATCH 6/9] Update esockd deps and fix compile warning --- rebar.config | 2 +- src/emqx_protocol.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 1f82fef6e..52bb8ead3 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "v5.4.4"}}} + , {esockd, "5.5.0"} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index eb86a1841..4d266e8e3 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1016,7 +1016,7 @@ do_check_banned(_EnableBan = true, Credentials) -> true -> {error, ?RC_BANNED}; false -> ok end; -do_check_banned(_EnableBan, Credentials) -> ok. +do_check_banned(_EnableBan, _Credentials) -> ok. do_acl_check(_EnableAcl = true, Action, Credentials, Topic) -> AllowTerm = ok, From f3fcd16dee907da3519187ee93af762a434204d4 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Mon, 3 Jun 2019 09:36:18 +0800 Subject: [PATCH 7/9] Fix wrong default value in protocol module --- src/emqx_protocol.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4d266e8e3..b731c55fe 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -949,7 +949,7 @@ do_flapping_detect(Action, #pstate{zone = Zone, client_id = ClientId}) -> ok = case emqx_zone:get_env(Zone, enable_flapping_detect, false) of true -> - Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), + Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}), case emqx_flapping:check(Action, ClientId, Threshold) of flapping -> BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), From f46205daff6f102b1cac0207ecce36bdaaa74256 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 3 Jun 2019 14:42:57 +0800 Subject: [PATCH 8/9] Restart mnesia application --- src/emqx.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/emqx.erl b/src/emqx.erl index ea22a31d3..c126b262c 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -75,6 +75,8 @@ start() -> restart(ConfFile) -> reload_config(ConfFile), shutdown(), + ok = application:stop(mnesia), + application:start(mnesia), reboot(). %% @doc Stop emqx application. From f801ff1e53e72e2ef501195e5b09fd0d4e14bbad Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 5 Jun 2019 23:16:35 +0800 Subject: [PATCH 9/9] Optimize metrics using counters and persistent_term (#2597) * Optimize emqx_metrics module using persistent_term and counters in OTP 22 --- src/emqx_broker.erl | 8 +- src/emqx_cm.erl | 2 +- src/emqx_connection.erl | 5 +- src/emqx_metrics.erl | 522 ++++++++++++++++++++++-------------- src/emqx_protocol.erl | 4 +- src/emqx_router_helper.erl | 4 +- src/emqx_session.erl | 16 +- src/emqx_shared_sub.erl | 2 +- src/emqx_sm.erl | 4 +- src/emqx_stats.erl | 47 ++-- src/emqx_sys.erl | 7 +- src/emqx_ws_connection.erl | 5 +- test/emqx_broker_SUITE.erl | 8 +- test/emqx_metrics_SUITE.erl | 80 +++--- test/emqx_stats_tests.erl | 34 +-- 15 files changed, 437 insertions(+), 311 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 5f66e5810..cccc0bcd7 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -295,7 +295,7 @@ dispatch({shard, I}, Topic, Msg) -> inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; inc_dropped_cnt(_Topic) -> - emqx_metrics:inc('messages/dropped'). + emqx_metrics:inc('messages.dropped'). -spec(subscribers(emqx_topic:topic()) -> [pid()]). subscribers(Topic) when is_binary(Topic) -> @@ -377,9 +377,9 @@ topics() -> %%------------------------------------------------------------------------------ stats_fun() -> - safe_update_stats(?SUBSCRIBER, 'subscribers/count', 'subscribers/max'), - safe_update_stats(?SUBSCRIPTION, 'subscriptions/count', 'subscriptions/max'), - safe_update_stats(?SUBOPTION, 'suboptions/count', 'suboptions/max'). + safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'), + safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'), + safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max'). safe_update_stats(Tab, Stat, MaxStat) -> case ets:info(Tab, size) of diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 2a81e0bcb..d78e99465 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -205,5 +205,5 @@ clean_down({Pid, ClientId}) -> stats_fun() -> case ets:info(?CONN_TAB, size) of undefined -> ok; - Size -> emqx_stats:setstat('connections/count', 'connections/max', Size) + Size -> emqx_stats:setstat('connections.count', 'connections.max', Size) end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 0bd55756e..2e8d2ac69 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -219,7 +219,7 @@ connected(enter, _, _State) -> %% Handle Input connected(cast, {incoming, Packet = ?PACKET(Type)}, State) -> - _ = emqx_metrics:received(Packet), + ok = emqx_metrics:inc_recv(Packet), (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), handle_packet(Packet, fun(NState) -> {keep_state, NState} @@ -296,7 +296,7 @@ handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> Oct = iolist_size(Data), ?LOG(debug, "[Connection] RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), - emqx_metrics:trans(inc, 'bytes/received', Oct), + ok = emqx_metrics:inc('bytes.received', Oct), NState = ensure_stats_timer(maybe_gc({1, Oct}, State)), process_incoming(Data, [], NState); @@ -336,7 +336,6 @@ handle(info, {timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState, gc_state = GcState}) -> - emqx_metrics:commit(), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), NState = State#state{stats_timer = undefined}, Limits = erlang:get(force_shutdown_policy), diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index aeb94019f..cf5939d38 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -20,30 +20,31 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). --export([start_link/0]). +-export([ start_link/0 + , stop/0 + ]). -export([ new/1 + , new/2 , all/0 ]). -export([ val/1 , inc/1 , inc/2 - , inc/3 + , dec/1 , dec/2 - , dec/3 , set/2 ]). -export([ trans/2 , trans/3 - , trans/4 , commit/0 ]). -%% Received/sent metrics --export([ received/1 - , sent/1 +%% Inc received/sent metrics +-export([ inc_recv/1 + , inc_sent/1 ]). %% gen_server callbacks @@ -55,267 +56,320 @@ , code_change/3 ]). -%% Bytes sent and received of Broker +-opaque(metric_idx() :: 1..1024). + +-type(metric_name() :: atom() | string() | binary()). + +-export_type([metric_idx/0]). + +-define(MAX_SIZE, 1024). +-define(RESERVED_IDX, 256). +-define(TAB, ?MODULE). +-define(SERVER, ?MODULE). + +%% Bytes sent and received of broker -define(BYTES_METRICS, [ - {counter, 'bytes/received'}, % Total bytes received - {counter, 'bytes/sent'} % Total bytes sent + {counter, 'bytes.received'}, % Total bytes received + {counter, 'bytes.sent'} % Total bytes sent ]). %% Packets sent and received of broker -define(PACKET_METRICS, [ - {counter, 'packets/received'}, % All Packets received - {counter, 'packets/sent'}, % All Packets sent - {counter, 'packets/connect'}, % CONNECT Packets received - {counter, 'packets/connack'}, % CONNACK Packets sent - {counter, 'packets/publish/received'}, % PUBLISH packets received - {counter, 'packets/publish/sent'}, % PUBLISH packets sent - {counter, 'packets/puback/received'}, % PUBACK packets received - {counter, 'packets/puback/sent'}, % PUBACK packets sent - {counter, 'packets/puback/missed'}, % PUBACK packets missed - {counter, 'packets/pubrec/received'}, % PUBREC packets received - {counter, 'packets/pubrec/sent'}, % PUBREC packets sent - {counter, 'packets/pubrec/missed'}, % PUBREC packets missed - {counter, 'packets/pubrel/received'}, % PUBREL packets received - {counter, 'packets/pubrel/sent'}, % PUBREL packets sent - {counter, 'packets/pubrel/missed'}, % PUBREL packets missed - {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received - {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent - {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed - {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received - {counter, 'packets/suback'}, % SUBACK packets sent - {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received - {counter, 'packets/unsuback'}, % UNSUBACK Packets sent - {counter, 'packets/pingreq'}, % PINGREQ packets received - {counter, 'packets/pingresp'}, % PINGRESP Packets sent - {counter, 'packets/disconnect/received'}, % DISCONNECT Packets received - {counter, 'packets/disconnect/sent'}, % DISCONNECT Packets sent - {counter, 'packets/auth'} % Auth Packets received + {counter, 'packets.received'}, % All Packets received + {counter, 'packets.sent'}, % All Packets sent + {counter, 'packets.connect.received'}, % CONNECT Packets received + {counter, 'packets.connack.sent'}, % CONNACK Packets sent + {counter, 'packets.connack.error'}, % CONNACK error sent + {counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent + {counter, 'packets.publish.received'}, % PUBLISH packets received + {counter, 'packets.publish.sent'}, % PUBLISH packets sent + {counter, 'packets.publish.error'}, % PUBLISH failed for error + {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error + {counter, 'packets.puback.received'}, % PUBACK packets received + {counter, 'packets.puback.sent'}, % PUBACK packets sent + {counter, 'packets.puback.missed'}, % PUBACK packets missed + {counter, 'packets.pubrec.received'}, % PUBREC packets received + {counter, 'packets.pubrec.sent'}, % PUBREC packets sent + {counter, 'packets.pubrec.missed'}, % PUBREC packets missed + {counter, 'packets.pubrel.received'}, % PUBREL packets received + {counter, 'packets.pubrel.sent'}, % PUBREL packets sent + {counter, 'packets.pubrel.missed'}, % PUBREL packets missed + {counter, 'packets.pubcomp.received'}, % PUBCOMP packets received + {counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent + {counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed + {counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received + {counter, 'packets.subscribe.error'}, % SUBSCRIBE error + {counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth + {counter, 'packets.suback.sent'}, % SUBACK packets sent + {counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received + {counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error + {counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent + {counter, 'packets.pingreq.received'}, % PINGREQ packets received + {counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent + {counter, 'packets.disconnect.received'}, % DISCONNECT Packets received + {counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent + {counter, 'packets.auth.received'}, % Auth Packets received + {counter, 'packets.auth.sent'} % Auth Packets sent ]). %% Messages sent and received of broker -define(MESSAGE_METRICS, [ - {counter, 'messages/received'}, % All Messages received - {counter, 'messages/sent'}, % All Messages sent - {counter, 'messages/qos0/received'}, % QoS0 Messages received - {counter, 'messages/qos0/sent'}, % QoS0 Messages sent - {counter, 'messages/qos1/received'}, % QoS1 Messages received - {counter, 'messages/qos1/sent'}, % QoS1 Messages sent - {counter, 'messages/qos2/received'}, % QoS2 Messages received - {counter, 'messages/qos2/expired'}, % QoS2 Messages expired - {counter, 'messages/qos2/sent'}, % QoS2 Messages sent - {counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped - {gauge, 'messages/retained'}, % Messagea retained - {counter, 'messages/dropped'}, % Messages dropped - {counter, 'messages/expired'}, % Messages expired - {counter, 'messages/forward'} % Messages forward + {counter, 'messages.received'}, % All Messages received + {counter, 'messages.sent'}, % All Messages sent + {counter, 'messages.qos0.received'}, % QoS0 Messages received + {counter, 'messages.qos0.sent'}, % QoS0 Messages sent + {counter, 'messages.qos1.received'}, % QoS1 Messages received + {counter, 'messages.qos1.sent'}, % QoS1 Messages sent + {counter, 'messages.qos2.received'}, % QoS2 Messages received + {counter, 'messages.qos2.expired'}, % QoS2 Messages expired + {counter, 'messages.qos2.sent'}, % QoS2 Messages sent + {counter, 'messages.qos2.dropped'}, % QoS2 Messages dropped + {gauge, 'messages.retained'}, % Messagea retained + {counter, 'messages.dropped'}, % Messages dropped + {counter, 'messages.expired'}, % Messages expired + {counter, 'messages.forward'} % Messages forward ]). --define(TAB, ?MODULE). --define(SERVER, ?MODULE). +-record(state, {next_idx = 1}). + +-record(metric, {name, type, idx}). %% @doc Start the metrics server. -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER). + %%------------------------------------------------------------------------------ %% Metrics API %%------------------------------------------------------------------------------ -new({gauge, Name}) -> - ets:insert(?TAB, {{Name, 0}, 0}); +-spec(new(metric_name()) -> ok). +new(Name) -> + new(counter, Name). -new({counter, Name}) -> - Schedulers = lists:seq(1, emqx_vm:schedulers()), - ets:insert(?TAB, [{{Name, I}, 0} || I <- Schedulers]). +-spec(new(gauge|counter, metric_name()) -> ok). +new(gauge, Name) -> + create(gauge, Name); +new(counter, Name) -> + create(counter, Name). + +%% @private +create(Type, Name) -> + case gen_server:call(?SERVER, {create, Type, Name}) of + {ok, _Idx} -> ok; + {error, Reason} -> error(Reason) + end. %% @doc Get all metrics --spec(all() -> [{atom(), non_neg_integer()}]). +-spec(all() -> [{metric_name(), non_neg_integer()}]). all() -> - maps:to_list( - ets:foldl( - fun({{Metric, _N}, Val}, Map) -> - case maps:find(Metric, Map) of - {ok, Count} -> maps:put(Metric, Count+Val, Map); - error -> maps:put(Metric, Val, Map) - end - end, #{}, ?TAB)). + CRef = persistent_term:get(?MODULE), + [{Name, counters:get(CRef, Idx)} + || #metric{name = Name, idx = Idx} <- ets:tab2list(?TAB)]. %% @doc Get metric value --spec(val(atom()) -> non_neg_integer()). -val(Metric) -> - lists:sum(ets:select(?TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). +-spec(val(metric_name()) -> maybe(non_neg_integer())). +val(Name) -> + case ets:lookup(?TAB, Name) of + [#metric{idx = Idx}] -> + CRef = persistent_term:get(?MODULE), + counters:get(CRef, Idx); + [] -> undefined + end. %% @doc Increase counter --spec(inc(atom()) -> non_neg_integer()). -inc(Metric) -> - inc(counter, Metric, 1). +-spec(inc(metric_name()) -> ok). +inc(Name) -> + inc(Name, 1). %% @doc Increase metric value --spec(inc({counter | gauge, atom()} | atom(), pos_integer()) -> non_neg_integer()). -inc({gauge, Metric}, Val) -> - inc(gauge, Metric, Val); -inc({counter, Metric}, Val) -> - inc(counter, Metric, Val); -inc(Metric, Val) when is_atom(Metric) -> - inc(counter, Metric, Val). - -%% @doc Increase metric value --spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()). -inc(Type, Metric, Val) -> - update_counter(key(Type, Metric), {2, Val}). +-spec(inc(metric_name(), pos_integer()) -> ok). +inc(Name, Value) -> + update_counter(Name, Value). %% @doc Decrease metric value --spec(dec(gauge, atom()) -> integer()). -dec(gauge, Metric) -> - dec(gauge, Metric, 1). +-spec(dec(metric_name()) -> ok). +dec(Name) -> + dec(Name, 1). %% @doc Decrease metric value --spec(dec(gauge, atom(), pos_integer()) -> integer()). -dec(gauge, Metric, Val) -> - update_counter(key(gauge, Metric), {2, -Val}). +-spec(dec(metric_name(), pos_integer()) -> ok). +dec(Name, Value) -> + update_counter(Name, -Value). %% @doc Set metric value -set(Metric, Val) when is_atom(Metric) -> - set(gauge, Metric, Val). -set(gauge, Metric, Val) -> - ets:insert(?TAB, {key(gauge, Metric), Val}). +-spec(set(metric_name(), integer()) -> ok). +set(Name, Value) -> + CRef = persistent_term:get(?MODULE), + Idx = ets:lookup_element(?TAB, Name, 4), + counters:put(CRef, Idx, Value). -trans(inc, Metric) -> - trans(inc, {counter, Metric}, 1). +-spec(trans(inc | dec, metric_name()) -> ok). +trans(Op, Name) when Op =:= inc; Op =:= dec -> + trans(Op, Name, 1). -trans(Opt, {gauge, Metric}, Val) -> - trans(Opt, gauge, Metric, Val); -trans(inc, {counter, Metric}, Val) -> - trans(inc, counter, Metric, Val); -trans(inc, Metric, Val) when is_atom(Metric) -> - trans(inc, counter, Metric, Val); -trans(dec, gauge, Metric) -> - trans(dec, gauge, Metric, 1). +-spec(trans(inc | dec, metric_name(), pos_integer()) -> ok). +trans(inc, Name, Value) -> + cache(Name, Value); +trans(dec, Name, Value) -> + cache(Name, -Value). -trans(inc, Type, Metric, Val) -> - hold(Type, Metric, Val); -trans(dec, gauge, Metric, Val) -> - hold(gauge, Metric, -Val). - -hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge -> +-spec(cache(metric_name(), integer()) -> ok). +cache(Name, Value) -> put('$metrics', case get('$metrics') of undefined -> - #{{Type, Metric} => Val}; + #{Name => Value}; Metrics -> - maps:update_with({Type, Metric}, fun(Cnt) -> Cnt + Val end, Val, Metrics) - end). + maps:update_with(Name, fun(Cnt) -> Cnt + Value end, Value, Metrics) + end), + ok. +-spec(commit() -> ok). commit() -> case get('$metrics') of undefined -> ok; Metrics -> - maps:fold(fun({Type, Metric}, Val, _Acc) -> - update_counter(key(Type, Metric), {2, Val}) - end, 0, Metrics), - erase('$metrics') + _ = erase('$metrics'), + lists:foreach(fun update_counter/1, maps:to_list(Metrics)) end. -%% @doc Metric key -key(gauge, Metric) -> - {Metric, 0}; -key(counter, Metric) -> - {Metric, erlang:system_info(scheduler_id)}. +update_counter({Name, Value}) -> + update_counter(Name, Value). -update_counter(Key, UpOp) -> - ets:update_counter(?TAB, Key, UpOp). +update_counter(Name, Value) -> + CRef = persistent_term:get(?MODULE), + CIdx = case reserved_idx(Name) of + Idx when is_integer(Idx) -> Idx; + undefined -> + ets:lookup_element(?TAB, Name, 4) + end, + counters:add(CRef, CIdx, Value). -%%----------------------------------------------------------------------------- -%% Received/Sent metrics -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% Inc Received/Sent metrics +%%------------------------------------------------------------------------------ -%% @doc Count packets received. --spec(received(emqx_mqtt_types:packet()) -> ok). -received(Packet) -> - inc('packets/received'), - received1(Packet). -received1(?PUBLISH_PACKET(QoS, _PktId)) -> - inc('packets/publish/received'), - inc('messages/received'), - qos_received(QoS); -received1(?PACKET(Type)) -> - received2(Type). -received2(?CONNECT) -> - inc('packets/connect'); -received2(?PUBACK) -> - inc('packets/puback/received'); -received2(?PUBREC) -> - inc('packets/pubrec/received'); -received2(?PUBREL) -> - inc('packets/pubrel/received'); -received2(?PUBCOMP) -> - inc('packets/pubcomp/received'); -received2(?SUBSCRIBE) -> - inc('packets/subscribe'); -received2(?UNSUBSCRIBE) -> - inc('packets/unsubscribe'); -received2(?PINGREQ) -> - inc('packets/pingreq'); -received2(?DISCONNECT) -> - inc('packets/disconnect/received'); -received2(_) -> +%% @doc Inc packets received. +-spec(inc_recv(emqx_mqtt_types:packet()) -> ok). +inc_recv(Packet) -> + inc('packets.received'), + do_inc_recv(Packet). + +do_inc_recv(?PACKET(?CONNECT)) -> + inc('packets.connect.received'); +do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) -> + inc('messages.received'), + case QoS of + ?QOS_0 -> inc('messages.qos0.received'); + ?QOS_1 -> inc('messages.qos1.received'); + ?QOS_2 -> inc('messages.qos2.received') + end, + inc('packets.publish.received'); +do_inc_recv(?PACKET(?PUBACK)) -> + inc('packets.puback.received'); +do_inc_recv(?PACKET(?PUBREC)) -> + inc('packets.pubrec.received'); +do_inc_recv(?PACKET(?PUBREL)) -> + inc('packets.pubrel.received'); +do_inc_recv(?PACKET(?PUBCOMP)) -> + inc('packets.pubcomp.received'); +do_inc_recv(?PACKET(?SUBSCRIBE)) -> + inc('packets.subscribe.received'); +do_inc_recv(?PACKET(?UNSUBSCRIBE)) -> + inc('packets.unsubscribe.received'); +do_inc_recv(?PACKET(?PINGREQ)) -> + inc('packets.pingreq.received'); +do_inc_recv(?PACKET(?DISCONNECT)) -> + inc('packets.disconnect.received'); +do_inc_recv(?PACKET(?AUTH)) -> + inc('packets.auth.received'); +do_inc_recv(_Packet) -> ignore. -qos_received(?QOS_0) -> - inc('messages/qos0/received'); -qos_received(?QOS_1) -> - inc('messages/qos1/received'); -qos_received(?QOS_2) -> - inc('messages/qos2/received'). -%% @doc Count packets received. Will not count $SYS PUBLISH. --spec(sent(emqx_mqtt_types:packet()) -> ignore | non_neg_integer()). -sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) -> +%% @doc Inc packets sent. Will not count $SYS PUBLISH. +-spec(inc_sent(emqx_mqtt_types:packet()) -> ok | ignore). +inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) -> ignore; -sent(Packet) -> - inc('packets/sent'), - sent1(Packet). -sent1(?PUBLISH_PACKET(QoS, _PktId)) -> - inc('packets/publish/sent'), - inc('messages/sent'), - qos_sent(QoS); -sent1(?PACKET(Type)) -> - sent2(Type). -sent2(?CONNACK) -> - inc('packets/connack'); -sent2(?PUBACK) -> - inc('packets/puback/sent'); -sent2(?PUBREC) -> - inc('packets/pubrec/sent'); -sent2(?PUBREL) -> - inc('packets/pubrel/sent'); -sent2(?PUBCOMP) -> - inc('packets/pubcomp/sent'); -sent2(?SUBACK) -> - inc('packets/suback'); -sent2(?UNSUBACK) -> - inc('packets/unsuback'); -sent2(?PINGRESP) -> - inc('packets/pingresp'); -sent2(?DISCONNECT) -> - inc('packets/disconnect/sent'); -sent2(_Type) -> +inc_sent(Packet) -> + inc('packets.sent'), + do_inc_sent(Packet). + +do_inc_sent(?CONNACK_PACKET(ReasonCode)) -> + (ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'), + (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.connack.auth_error'), + (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'), + inc('packets.connack.sent'); + +do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) -> + inc('messages.sent'), + case QoS of + ?QOS_0 -> inc('messages.qos0.sent'); + ?QOS_1 -> inc('messages.qos1.sent'); + ?QOS_2 -> inc('messages.qos2.sent') + end, + inc('packets.publish.sent'); +do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) -> + (ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'), + (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'), + inc('packets.puback.sent'); +do_inc_sent(?PUBREC_PACKET(_PacketId, ReasonCode)) -> + (ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'), + (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'), + inc('packets.pubrec.sent'); +do_inc_sent(?PACKET(?PUBREL)) -> + inc('packets.pubrel.sent'); +do_inc_sent(?PACKET(?PUBCOMP)) -> + inc('packets.pubcomp.sent'); +do_inc_sent(?PACKET(?SUBACK)) -> + inc('packets.suback.sent'); +do_inc_sent(?PACKET(?UNSUBACK)) -> + inc('packets.unsuback.sent'); +do_inc_sent(?PACKET(?PINGRESP)) -> + inc('packets.pingresp.sent'); +do_inc_sent(?PACKET(?DISCONNECT)) -> + inc('packets.disconnect.sent'); +do_inc_sent(?PACKET(?AUTH)) -> + inc('packets.auth.sent'); +do_inc_sent(_Packet) -> ignore. -qos_sent(?QOS_0) -> - inc('messages/qos0/sent'); -qos_sent(?QOS_1) -> - inc('messages/qos1/sent'); -qos_sent(?QOS_2) -> - inc('messages/qos2/sent'). %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ init([]) -> - % Create metrics table - ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]), - lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), - {ok, #{}, hibernate}. + % Create counters array + CRef = counters:new(?MAX_SIZE, [write_concurrency]), + ok = persistent_term:put(?MODULE, CRef), + % Create index mapping table + ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]), + % Store reserved indices + lists:foreach(fun({Type, Name}) -> + Idx = reserved_idx(Name), + Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)}, + true = ets:insert(?TAB, Metric), + ok = counters:put(CRef, Idx, 0) + end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), + {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. + +handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> + ?LOG(error, "[Metrics] Failed to create ~s:~s for index exceeded.", [Type, Name]), + {reply, {error, metric_index_exceeded}, State}; + +handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> + case ets:lookup(?TAB, Name) of + [#metric{idx = Idx}] -> + ?LOG(warning, "[Metrics] ~s already exists.", [Name]), + {reply, {ok, Idx}, State}; + [] -> + Metric = #metric{name = Name, type = Type, idx = NextIdx}, + true = ets:insert(?TAB, Metric), + {reply, {ok, NextIdx}, State#state{next_idx = NextIdx + 1}} + end; handle_call(Req, _From, State) -> ?LOG(error, "[Metrics] Unexpected call: ~p", [Req]), @@ -329,9 +383,65 @@ handle_info(Info, State) -> ?LOG(error, "[Metrics] Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #{}) -> +terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +reserved_idx('bytes.received') -> 01; +reserved_idx('bytes.sent') -> 02; +reserved_idx('packets.received') -> 03; +reserved_idx('packets.sent') -> 04; +reserved_idx('packets.connect.received') -> 05; +reserved_idx('packets.connack.sent') -> 06; +reserved_idx('packets.connack.error') -> 07; +reserved_idx('packets.connack.auth_error') -> 08; +reserved_idx('packets.publish.received') -> 09; +reserved_idx('packets.publish.sent') -> 10; +reserved_idx('packets.publish.error') -> 11; +reserved_idx('packets.publish.auth_error') -> 12; +reserved_idx('packets.puback.received') -> 13; +reserved_idx('packets.puback.sent') -> 14; +reserved_idx('packets.puback.missed') -> 15; +reserved_idx('packets.pubrec.received') -> 16; +reserved_idx('packets.pubrec.sent') -> 17; +reserved_idx('packets.pubrec.missed') -> 18; +reserved_idx('packets.pubrel.received') -> 19; +reserved_idx('packets.pubrel.sent') -> 20; +reserved_idx('packets.pubrel.missed') -> 21; +reserved_idx('packets.pubcomp.received') -> 22; +reserved_idx('packets.pubcomp.sent') -> 23; +reserved_idx('packets.pubcomp.missed') -> 24; +reserved_idx('packets.subscribe.received') -> 25; +reserved_idx('packets.subscribe.error') -> 26; +reserved_idx('packets.subscribe.auth_error') -> 27; +reserved_idx('packets.suback.sent') -> 28; +reserved_idx('packets.unsubscribe.received') -> 29; +reserved_idx('packets.unsubscribe.error') -> 30; +reserved_idx('packets.unsuback.sent') -> 31; +reserved_idx('packets.pingreq.received') -> 32; +reserved_idx('packets.pingresp.sent') -> 33; +reserved_idx('packets.disconnect.received') -> 34; +reserved_idx('packets.disconnect.sent') -> 35; +reserved_idx('packets.auth.received') -> 36; +reserved_idx('packets.auth.sent') -> 37; +reserved_idx('messages.received') -> 38; +reserved_idx('messages.sent') -> 39; +reserved_idx('messages.qos0.received') -> 40; +reserved_idx('messages.qos0.sent') -> 41; +reserved_idx('messages.qos1.received') -> 42; +reserved_idx('messages.qos1.sent') -> 43; +reserved_idx('messages.qos2.received') -> 44; +reserved_idx('messages.qos2.expired') -> 45; +reserved_idx('messages.qos2.sent') -> 46; +reserved_idx('messages.qos2.dropped') -> 47; +reserved_idx('messages.retained') -> 48; +reserved_idx('messages.dropped') -> 49; +reserved_idx('messages.expired') -> 50; +reserved_idx('messages.forward') -> 51; +reserved_idx(_) -> undefined. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index b731c55fe..57f60c8c7 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -699,8 +699,8 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) {ok, PState}; {ok, Data} -> trace(send, Packet), - emqx_metrics:sent(Packet), - emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)), + emqx_metrics:inc_sent(Packet), + ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 0b31929fb..f272b1236 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -160,8 +160,8 @@ stats_fun() -> case ets:info(?ROUTE, size) of undefined -> ok; Size -> - emqx_stats:setstat('routes/count', 'routes/max', Size), - emqx_stats:setstat('topics/count', 'topics/max', Size) + emqx_stats:setstat('routes.count', 'routes.max', Size), + emqx_stats:setstat('topics.count', 'topics.max', Size) end. cleanup_routes(Node) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d38a28bab..56f29bd4c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -422,7 +422,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From, end; true -> ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), - emqx_metrics:trans(inc, 'messages/qos2/dropped'), + ok = emqx_metrics:inc('messages.qos2.dropped'), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -434,7 +434,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In {ok, ensure_stats_timer(acked(pubrec, PacketId, State))}; false -> ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]), - emqx_metrics:trans(inc, 'packets/pubrec/missed'), + ok = emqx_metrics:inc('packets.pubrec.missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -446,7 +446,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})}; error -> ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]), - emqx_metrics:trans(inc, 'packets/pubrel/missed'), + ok = emqx_metrics:inc('packets.pubrel.missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -496,7 +496,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} ensure_stats_timer(dequeue(acked(puback, PacketId, State))); false -> ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]), - emqx_metrics:trans(inc, 'packets/puback/missed'), + ok = emqx_metrics:inc('packets.puback.missed'), State end); @@ -508,7 +508,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State))); false -> ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]), - emqx_metrics:trans(inc, 'packets/pubcomp/missed'), + ok = emqx_metrics:inc('packets.pubcomp.missed'), State end); @@ -587,7 +587,6 @@ handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer, gc_state = GcState}) -> - emqx_metrics:commit(), _ = emqx_sm:set_session_stats(ClientId, stats(State)), NewState = State#state{stats_timer = undefined}, Limits = erlang:get(force_shutdown_policy), @@ -652,7 +651,6 @@ terminate(Reason, #state{will_msg = WillMsg, username = Username, conn_pid = ConnPid, old_conn_pid = OldConnPid}) -> - emqx_metrics:commit(), send_willmsg(WillMsg), [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). @@ -728,7 +726,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, {publish, {PacketId, Msg}} -> case emqx_message:is_expired(Msg) of true -> - emqx_metrics:trans(inc, 'messages/expired'), + ok = emqx_metrics:inc('messages.expired'), emqx_inflight:delete(PacketId, Inflight); false -> redeliver({PacketId, Msg}, State), @@ -770,7 +768,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, Timeout = get_env(Zone, await_rel_timeout), case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> - emqx_metrics:trans(inc, 'messages/qos2/expired'), + ok = emqx_metrics:inc('messages.qos2.expired'), ?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 5f1962542..48eba293a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -364,7 +364,7 @@ cleanup_down(SubPid) -> end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). update_stats(State) -> - emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), + emqx_stats:setstat('subscriptions.shared.count', 'subscriptions.shared.max', ets:info(?TAB, size)), State. %% Return 'true' if the subscriber process is alive AND not in the failed list diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index a5c145601..a122a05fe 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -284,8 +284,8 @@ clean_down(Session = {ClientId, SessPid}) -> end. stats_fun() -> - safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'), - safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max'). + safe_update_stats(?SESSION_TAB, 'sessions.count', 'sessions.max'), + safe_update_stats(?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'). safe_update_stats(Tab, Stat, MaxStat) -> case ets:info(Tab, size) of diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 771069704..c75bd742d 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -48,8 +48,8 @@ ]). -record(update, {name, countdown, interval, func}). --record(state, {timer, updates :: [#update{}], - tick_ms :: timeout()}). + +-record(state, {timer, updates :: [#update{}], tick_ms :: timeout()}). -type(stats() :: list({atom(), non_neg_integer()})). @@ -57,41 +57,41 @@ %% Connection stats -define(CONNECTION_STATS, [ - 'connections/count', % current connections - 'connections/max' % maximum connections connected + 'connections.count', % current connections + 'connections.max' % maximum connections connected ]). %% Session stats -define(SESSION_STATS, [ - 'sessions/count', - 'sessions/max', - 'sessions/persistent/count', - 'sessions/persistent/max' + 'sessions.count', + 'sessions.max', + 'sessions.persistent.count', + 'sessions.persistent.max' ]). %% Subscribers, Subscriptions stats -define(PUBSUB_STATS, [ - 'topics/count', - 'topics/max', - 'suboptions/count', - 'suboptions/max', - 'subscribers/count', - 'subscribers/max', - 'subscriptions/count', - 'subscriptions/max', - 'subscriptions/shared/count', - 'subscriptions/shared/max' + 'topics.count', + 'topics.max', + 'suboptions.count', + 'suboptions.max', + 'subscribers.count', + 'subscribers.max', + 'subscriptions.count', + 'subscriptions.max', + 'subscriptions.shared.count', + 'subscriptions.shared.max' ]). -define(ROUTE_STATS, [ - 'routes/count', - 'routes/max' + 'routes.count', + 'routes.max' ]). %% Retained stats -define(RETAINED_STATS, [ - 'retained/count', - 'retained/max' + 'retained.count', + 'retained.max' ]). -define(TAB, ?MODULE). @@ -181,7 +181,8 @@ start_timer(#state{tick_ms = Ms} = State) -> State#state{timer = emqx_misc:start_timer(Ms, tick)}. handle_call(stop, _From, State) -> - {stop, normal, _Reply = ok, State}; + {stop, normal, ok, State}; + handle_call(Req, _From, State) -> ?LOG(error, "[Stats] Unexpected call: ~p", [Req]), {reply, ignored, State}. diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 21dfd58df..90b24e122 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -184,8 +184,11 @@ publish(stats, Stats) -> [safe_publish(systop(lists:concat(['stats/', Stat])), integer_to_binary(Val)) || {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)]; publish(metrics, Metrics) -> - [safe_publish(systop(lists:concat(['metrics/', Metric])), integer_to_binary(Val)) - || {Metric, Val} <- Metrics, is_atom(Metric), is_integer(Val)]. + [safe_publish(systop(metric_topic(Name)), integer_to_binary(Val)) + || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]. + +metric_topic(Name) -> + lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]). safe_publish(Topic, Payload) -> safe_publish(Topic, #{}, Payload). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index e828dd0a2..d635a0caa 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -190,12 +190,12 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState, ?LOG(debug, "[WS Connection] RECV ~p", [Data]), BinSize = iolist_size(Data), emqx_pd:update_counter(recv_oct, BinSize), - emqx_metrics:trans(inc, 'bytes/received', BinSize), + ok = emqx_metrics:inc('bytes.received', BinSize), try emqx_frame:parse(iolist_to_binary(Data), ParseState) of {more, ParseState1} -> {ok, State#state{parse_state = ParseState1}}; {ok, Packet, Rest} -> - emqx_metrics:received(Packet), + ok = emqx_metrics:inc_recv(Packet), emqx_pd:update_counter(recv_cnt, 1), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> @@ -255,7 +255,6 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> websocket_info({timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState}) -> - emqx_metrics:commit(), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {ok, State#state{stats_timer = undefined}, hibernate}; diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e2cfe638d..14cd9e4ca 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -157,13 +157,13 @@ start_session(_) -> %% Metric Group %%-------------------------------------------------------------------- inc_dec_metric(_) -> - emqx_metrics:inc(gauge, 'messages/retained', 10), - emqx_metrics:dec(gauge, 'messages/retained', 10). + emqx_metrics:inc('messages.retained', 10), + emqx_metrics:dec('messages.retained', 10). %%-------------------------------------------------------------------- %% Stats Group %%-------------------------------------------------------------------- set_get_stat(_) -> - emqx_stats:setstat('retained/max', 99), - 99 = emqx_stats:getstat('retained/max'). + emqx_stats:setstat('retained.max', 99), + 99 = emqx_stats:getstat('retained.max'). diff --git a/test/emqx_metrics_SUITE.erl b/test/emqx_metrics_SUITE.erl index ff9dee8c1..b100d44e5 100644 --- a/test/emqx_metrics_SUITE.erl +++ b/test/emqx_metrics_SUITE.erl @@ -18,41 +18,57 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). -all() -> [t_inc_dec_metrics, t_trans]. +all() -> [t_inc_dec, t_inc_recv, t_inc_sent, t_trans]. -t_inc_dec_metrics(_) -> +t_inc_dec(_) -> {ok, _} = emqx_metrics:start_link(), - {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, - emqx_metrics:inc('bytes/received'), - emqx_metrics:inc({counter, 'bytes/received'}, 2), - emqx_metrics:inc(counter, 'bytes/received', 1), - emqx_metrics:inc('bytes/received', 1), - emqx_metrics:inc({gauge, 'messages/retained'}, 2), - emqx_metrics:inc(gauge, 'messages/retained', 2), - {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, - emqx_metrics:dec(gauge, 'messages/retained'), - emqx_metrics:dec(gauge, 'messages/retained', 1), - 2 = emqx_metrics:val('messages/retained'), - emqx_metrics:set('messages/retained', 3), - 3 = emqx_metrics:val('messages/retained'), - emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}), - {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')}, - emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}), - {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}. + ?assertEqual(0, emqx_metrics:val('bytes.received')), + ?assertEqual(0, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:inc('bytes.received'), + ok = emqx_metrics:inc('bytes.received', 2), + ok = emqx_metrics:inc('bytes.received', 2), + ?assertEqual(5, emqx_metrics:val('bytes.received')), + ok = emqx_metrics:inc('messages.retained', 2), + ok = emqx_metrics:inc('messages.retained', 2), + ?assertEqual(4, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:dec('messages.retained'), + ok = emqx_metrics:dec('messages.retained', 1), + ?assertEqual(2, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:set('messages.retained', 3), + ?assertEqual(3, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:stop(). + +t_inc_recv(_) -> + {ok, _} = emqx_metrics:start_link(), + ok = emqx_metrics:inc_recv(?PACKET(?CONNECT)), + ?assertEqual(1, emqx_metrics:val('packets.received')), + ?assertEqual(1, emqx_metrics:val('packets.connect.received')), + ok = emqx_metrics:stop(). + +t_inc_sent(_) -> + {ok, _} = emqx_metrics:start_link(), + ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)), + ?assertEqual(1, emqx_metrics:val('packets.sent')), + ?assertEqual(1, emqx_metrics:val('packets.connack.sent')), + ok = emqx_metrics:stop(). t_trans(_) -> {ok, _} = emqx_metrics:start_link(), - emqx_metrics:trans(inc, 'bytes/received'), - emqx_metrics:trans(inc, {counter, 'bytes/received'}, 2), - emqx_metrics:trans(inc, counter, 'bytes/received', 2), - emqx_metrics:trans(inc, {gauge, 'messages/retained'}, 2), - emqx_metrics:trans(inc, gauge, 'messages/retained', 2), - {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, - emqx_metrics:commit(), - {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, - emqx_metrics:trans(dec, gauge, 'messages/retained'), - emqx_metrics:trans(dec, gauge, 'messages/retained', 1), - 4 = emqx_metrics:val('messages/retained'), - emqx_metrics:commit(), - 2 = emqx_metrics:val('messages/retained'). + ok = emqx_metrics:trans(inc, 'bytes.received'), + ok = emqx_metrics:trans(inc, 'bytes.received', 2), + ?assertEqual(0, emqx_metrics:val('bytes.received')), + ok = emqx_metrics:trans(inc, 'messages.retained', 2), + ok = emqx_metrics:trans(inc, 'messages.retained', 2), + ?assertEqual(0, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:commit(), + ?assertEqual(3, emqx_metrics:val('bytes.received')), + ?assertEqual(4, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:trans(dec, 'messages.retained'), + ok = emqx_metrics:trans(dec, 'messages.retained', 1), + ?assertEqual(4, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:commit(), + ?assertEqual(2, emqx_metrics:val('messages.retained')), + ok = emqx_metrics:stop(). + diff --git a/test/emqx_stats_tests.erl b/test/emqx_stats_tests.erl index 7170c422b..c88f3c831 100644 --- a/test/emqx_stats_tests.erl +++ b/test/emqx_stats_tests.erl @@ -18,27 +18,27 @@ get_state_test() -> with_proc(fun() -> - SetConnsCount = emqx_stats:statsfun('connections/count'), + SetConnsCount = emqx_stats:statsfun('connections.count'), SetConnsCount(1), - 1 = emqx_stats:getstat('connections/count'), - emqx_stats:setstat('connections/count', 2), - 2 = emqx_stats:getstat('connections/count'), - emqx_stats:setstat('connections/count', 'connections/max', 3), + 1 = emqx_stats:getstat('connections.count'), + emqx_stats:setstat('connections.count', 2), + 2 = emqx_stats:getstat('connections.count'), + emqx_stats:setstat('connections.count', 'connections.max', 3), timer:sleep(100), - 3 = emqx_stats:getstat('connections/count'), - 3 = emqx_stats:getstat('connections/max'), - emqx_stats:setstat('connections/count', 'connections/max', 2), + 3 = emqx_stats:getstat('connections.count'), + 3 = emqx_stats:getstat('connections.max'), + emqx_stats:setstat('connections.count', 'connections.max', 2), timer:sleep(100), - 2 = emqx_stats:getstat('connections/count'), - 3 = emqx_stats:getstat('connections/max'), - SetConns = emqx_stats:statsfun('connections/count', 'connections/max'), + 2 = emqx_stats:getstat('connections.count'), + 3 = emqx_stats:getstat('connections.max'), + SetConns = emqx_stats:statsfun('connections.count', 'connections.max'), SetConns(4), timer:sleep(100), - 4 = emqx_stats:getstat('connections/count'), - 4 = emqx_stats:getstat('connections/max'), + 4 = emqx_stats:getstat('connections.count'), + 4 = emqx_stats:getstat('connections.max'), Conns = emqx_stats:getstats(), - 4 = proplists:get_value('connections/count', Conns), - 4 = proplists:get_value('connections/max', Conns) + 4 = proplists:get_value('connections.count', Conns), + 4 = proplists:get_value('connections.max', Conns) end). update_interval_test() -> @@ -46,10 +46,10 @@ update_interval_test() -> with_proc(fun() -> SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks emqx_stats:cancel_update(cm_stats), - UpdFun = fun() -> emqx_stats:setstat('connections/count', 1) end, + UpdFun = fun() -> emqx_stats:setstat('connections.count', 1) end, ok = emqx_stats:update_interval(stats_test, UpdFun), timer:sleep(SleepMs), - ?assertEqual(1, emqx_stats:getstat('connections/count')) + ?assertEqual(1, emqx_stats:getstat('connections.count')) end, TickMs). helper_test_() ->